Compare commits

...

18 commits

Author SHA1 Message Date
Pauline Bailly-Masson
1416fa0cf2
🔒 pin tests.yml actions to commit SHAs (#721) 2026-04-02 16:03:12 +02:00
Quentin Gallouédec
0e06249d1c
Update README.md 2025-07-17 13:20:00 -07:00
Quentin Gallouédec
7e700c6218
Update citation (#688) 2025-07-07 10:23:08 -07:00
lewtun
b806e1092a
Bump vLLM and TRL (#665)
* Bump vLLM and TRL

* Fix Makefile
2025-05-28 13:47:25 +02:00
lewtun
a6b4f668fb
Fix Weka refresh (#666)
* Fix Weka refresh

* Update evaluate.slurm
2025-05-28 13:45:48 +02:00
lewtun
01b4351c45
Set DP=2 for smol model evals (#664)
* Set DP=2 for smol model evals

Temporary hack while the HF cluster is at max capacity :)

* Style
2025-05-28 09:23:12 +02:00
lewtun
722f144d21
Refresh Weka on Slurm (#662)
* Refresh Weka on Slurm

* Include current working dir
2025-05-27 19:21:15 +02:00
lewtun
33f84def0d
Align EOS token ID between tokenizer and generation config (#663)
* Align EOS token ID between tokenizer and generation config

* Fix
2025-05-27 17:20:13 +02:00
lewtun
9eef995b4d
Bump deps (#656) 2025-05-27 15:38:21 +02:00
lewtun
5ac5971ea5
Add OpenR1-Distill recipe (#661) 2025-05-26 17:57:44 +02:00
lewtun
57e85b522f
Add better logging defaults for GRPO (#657) 2025-05-25 13:24:52 +02:00
Guilherme Penedo
c1e1192294
GRPO with codeforces problems (#627)
* add

* update

* updates

* updates #2

* weighted_sum and python fixes

* bugfix

* merging ioi/cf setups

* integrating the morph changes

* move morph_client

* run style

* small changes for mixed languages training

* revert grpo.py changes

* piston readme

* local test fetching

* bug fixes

* updated readme

* style fixes

* style fixes 2

* deps changes

* import sorting

* fix tests

* Update README.md

Co-authored-by: lewtun <lewis.c.tunstall@gmail.com>

* Update README.md

Co-authored-by: lewtun <lewis.c.tunstall@gmail.com>

* Update src/open_r1/rewards.py

Co-authored-by: lewtun <lewis.c.tunstall@gmail.com>

---------

Co-authored-by: lewtun <lewis.c.tunstall@gmail.com>
2025-05-25 11:55:27 +02:00
lewtun
db2d9b011a
Bump lower bound on liger-kernel (#654)
Related to https://github.com/huggingface/open-r1/pull/653

(I forgot to include this in that PR)
2025-05-22 08:44:13 +02:00
lewtun
8067149e90
Bump DeepSpeed to 0.16.8 to fix OOM on Qwen3 (#653) 2025-05-21 22:25:57 +02:00
lewtun
9366aa2df3
Add dataset mixer (#647)
* Prototype

* Clean up

* Refactor

* Add tests

* Add doc and make scripts work

* Tune doc

* Up

* Tune

* Add column verification

* Fix types

* Fix YAML

* Fix types

* Fix doc

* f

* f
2025-05-20 11:40:42 +02:00
Quentin Gallouédec
5e0c210f9c
use hf papers (#646) 2025-05-19 13:48:14 +02:00
lewtun
ebd5913a85
Bump LightEval (#643) 2025-05-16 10:52:05 +02:00
Edward Beeching
ea5b7edf22
Add dataset filtering script (#637)
* add dataset filtering script

* remove subset selection

* save wip

* save wip

* update filter script

* refactor to run on chunks

* rename script

* cleanup

* update dapo filtering

* fixes

* dapo filt config

* udpate compute pass rate

* clean

* update readme and config

* add merging snippet
2025-05-16 10:26:49 +02:00
37 changed files with 1428 additions and 397 deletions

View file

@ -16,9 +16,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Setup Python environment
uses: actions/setup-python@v5
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5
with:
python-version: 3.10.10
- name: Install dependencies

View file

@ -8,10 +8,11 @@ check_dirs := src tests
# dev dependencies
install:
uv venv openr1 --python 3.11 && . openr1/bin/activate && uv pip install --upgrade pip
uv pip install vllm==0.8.4
uv pip install setuptools
uv pip install flash-attn --no-build-isolation
uv venv openr1 --python 3.11
. openr1/bin/activate && uv pip install --upgrade pip && \
uv pip install vllm==0.8.5.post1 && \
uv pip install setuptools && \
uv pip install flash-attn --no-build-isolation && \
GIT_LFS_SKIP_SMUDGE=1 uv pip install -e ".[dev]"
style:

166
README.md
View file

@ -21,10 +21,9 @@
The goal of this repo is to build the missing pieces of the R1 pipeline such that everybody can reproduce and build on top of it. The project is simple by design and mostly consists of:
- `src/open_r1`: contains the scripts to train and evaluate models as well as generate synthetic data:
- `src/open_r1`: contains the scripts to train models as well as generate synthetic data:
- `grpo.py`: trains a model with GRPO on a given dataset.
- `sft.py`: performs a simple SFT of a model on a dataset.
- `evaluate.py`: evaluates a model on the R1 benchmarks.
- `generate.py`: generates synthetic data from a model using [Distilabel](https://github.com/argilla-io/distilabel).
- `Makefile`: contains easy-to-run commands for each step in the R1 pipeline leveraging the scripts above.
@ -42,6 +41,7 @@ We will use the DeepSeek-R1 [tech report](https://github.com/deepseek-ai/DeepSee
## News 🗞️
* **🧑‍🍳 [2025/05/26] (Step 1 completed!)** We release [**Mixture-of-Thoughts**](https://huggingface.co/datasets/open-r1/Mixture-of-Thoughts)--a curated reasoning dataset of 350k verified traces distilled from R1. The dataset spans tasks in mathematics, coding, and science, and is designed to teach language models to reason step-by-step. We also provide a recipe to train [OpenR1-Distill-7B](https://huggingface.co/open-r1/OpenR1-Distill-7B), which replicates the reasoning capabilities of [deepseek-ai/DeepSeek-R1-Distill-Qwen-7B](https://huggingface.co/deepseek-ai/DeepSeek-R1-Distill-Qwen-7B) and marks the completion of step 1 in the Open R1 project.
* **⚡️ [2025/03/11] [(update #3)](https://huggingface.co/blog/open-r1/update-3):** We release the [**CodeForces-CoTs**](https://huggingface.co/datasets/open-r1/codeforces-cots) dataset of 10k competitive programming problems and 100k solutions distilled from R1. We also release IOI24: a new benchmark of _very_ hard problems from international olympiads. A 7B Qwen model trained on CodeForces-CoTs can outperform Claude 3.7 Sonnet on IOI24, while a 32B model can outperform R1 itself.
* **∞ [2025/02/10] [(update #2)](https://huggingface.co/blog/open-r1/update-2):** We release the [**OpenR1-Math-220k**](https://huggingface.co/datasets/open-r1/OpenR1-Math-220k) dataset of 220k traces distilled from R1 on a new version of NuminaMath. Models trained on this dataset match the performance of DeepSeek's distilled ones.
* **🔥 [2025/02/02] [(update #1)](https://huggingface.co/blog/open-r1/update-1):** We implement the first parts of the [training](https://github.com/huggingface/open-r1?tab=readme-ov-file#training-models), [inference](https://github.com/huggingface/open-r1?tab=readme-ov-file#data-generation), and [evaluation](https://github.com/huggingface/open-r1?tab=readme-ov-file#reproducing-deepseeks-evaluation-results) pipelines. Let's go!
@ -69,7 +69,7 @@ uv venv openr1 --python 3.11 && source openr1/bin/activate && uv pip install --u
Next, install vLLM and FlashAttention:
```shell
uv pip install vllm==0.8.4
uv pip install vllm==0.8.5.post1
uv pip install setuptools && uv pip install flash-attn --no-build-isolation
```
@ -103,25 +103,27 @@ sudo apt-get install git-lfs
> [!NOTE]
> The training commands below are configured for a node of 8 x H100s (80GB). For different hardware and topologies, you may need to tune the batch size and number of gradient accumulation steps.
We support training models with either DDP or DeepSpeed (ZeRO-2 and ZeRO-3). For example, to run SFT on a dataset distilled from DeepSeek-R1 with reasoning traces such as [open-r1/OpenR1-Math-220k](https://huggingface.co/datasets/open-r1/OpenR1-Math-220k), run:
We support training models with either DDP or DeepSpeed (ZeRO-2 and ZeRO-3). For example, to perform SFT on a dataset distilled from DeepSeek-R1 with reasoning traces such as [open-r1/Mixture-of-Thoughts](https://huggingface.co/datasets/open-r1/Mixture-of-Thoughts), run:
```shell
# Train via command line
accelerate launch --config_file=recipes/accelerate_configs/zero3.yaml src/open_r1/sft.py \
--model_name_or_path Qwen/Qwen2.5-1.5B-Instruct \
--dataset_name open-r1/OpenR1-Math-220k \
--learning_rate 5.0e-5 \
--num_train_epochs 1 \
--max_seq_length 16384 \
--per_device_train_batch_size 16 \
--model_name_or_path open-r1/Qwen2.5-Math-7B-RoPE-300k \
--dataset_name open-r1/Mixture-of-Thoughts \
--dataset_config all \
--eos_token '<|im_end|>' \
--learning_rate 4.0e-5 \
--num_train_epochs 5 \
--max_seq_length 32768 \
--per_device_train_batch_size 2 \
--gradient_checkpointing \
--bf16 \
--use_liger_kernel \
--output_dir data/Qwen2.5-1.5B-Open-R1-Distill
--output_dir data/OpenR1-Distill-7B
# Train via YAML config
accelerate launch --config_file recipes/accelerate_configs/zero3.yaml src/open_r1/sft.py \
--config recipes/Qwen2.5-1.5B-Instruct/sft/config_demo.yaml
--config recipes/OpenR1-Distill-7B/sft/config_distill.yaml
```
Currently, the following tasks are supported:
@ -135,17 +137,19 @@ Currently, the following tasks are supported:
By default, these scripts will push each model to your Hugging Face Hub username, i.e. `{username}/{model_name}-{task}`. You can override the parameters in each YAML config by appending them to the command as follows:
```shell
# Change batch size, number of epochs etc
# Change the base model to a smaller variant
accelerate launch --config_file recipes/accelerate_configs/zero3.yaml src/open_r1/sft.py \
--config recipes/Qwen2.5-1.5B-Instruct/sft/config_demo.yaml
--per_device_train_batch_size=1 --num_train_epochs=5
--config recipes/OpenR1-Distill-7B/sft/config_distill.yaml \
--model_name_or_path Qwen/Qwen3-0.6B-Base \
--hub_model_id OpenR1-Distill-0.6B \
--output_dir data/OpenR1-Distill-0.6B
```
If you also wish to override the Weights and Biases default settings, you can do so as follows:
```shell
accelerate launch --config_file recipes/accelerate_configs/zero3.yaml src/open_r1/sft.py \
--config recipes/Qwen2.5-1.5B-Instruct/sft/config_demo.yaml
--config recipes/OpenR1-Distill-7B/sft/config_distill.yaml
--wandb_entity huggingface --wandb_project open-r1 --run_name Qwen2.5-1.5B-GRPO
```
@ -158,10 +162,11 @@ Most base models like `meta-llama/Llama-3.2-1B` do not have a chat template, so
accelerate launch --config_file=recipes/accelerate_configs/zero3.yaml src/open_r1/sft.py \
--model_name_or_path Qwen/Qwen2.5-1.5B \
+ --eos_token '<|im_end|>'
--dataset_name open-r1/OpenR1-Math-220k \
--learning_rate 5.0e-5 \
--dataset_name open-r1/Mixture-of-Thoughts \
--dataset_config all \
--learning_rate 4.0e-5 \
--num_train_epochs 1 \
--max_seq_length 16384 \
--max_seq_length 32768 \
--per_device_train_batch_size 16 \
--gradient_checkpointing \
--bf16 \
@ -177,10 +182,11 @@ accelerate launch --config_file=recipes/accelerate_configs/zero3.yaml src/open_r
--model_name_or_path meta-llama/Llama-3.2-1B \
+ --chat_template "$(cat llama_chat_template.jinja)" \
+ --eos_token '<|eot_id|>' \
--dataset_name open-r1/OpenR1-Math-220k \
--learning_rate 5.0e-5 \
--dataset_name open-r1/Mixture-of-Thoughts \
--dataset_config all \
--learning_rate 4.0e-5 \
--num_train_epochs 1 \
--max_seq_length 16384 \
--max_seq_length 32768 \
--per_device_train_batch_size 16 \
--gradient_checkpointing \
--bf16 \
@ -188,55 +194,39 @@ accelerate launch --config_file=recipes/accelerate_configs/zero3.yaml src/open_r
--output_dir data/Llama-3.2-1B-Open-R1-Distill
```
### SFT
### SFT distillation
To run SFT on a dataset distilled from DeepSeek-R1 with reasoning traces such as [open-r1/OpenR1-Math-220k](https://huggingface.co/datasets/open-r1/OpenR1-Math-220k), run:
We provide a recipe to reproduce the reasoning capabilities of [deepseek-ai/DeepSeek-R1-Distill-Qwen-7B](https://huggingface.co/deepseek-ai/DeepSeek-R1-Distill-Qwen-7B), starting from the same base model. To do so, run:
```shell
ACCELERATE_LOG_LEVEL=info accelerate launch --config_file recipes/accelerate_configs/zero3.yaml \
src/open_r1/sft.py \
--config recipes/Qwen2.5-1.5B-Instruct/sft/config_demo.yaml
--config recipes/OpenR1-Distill-7B/sft/config_distill.yaml
```
The result will be a model like [open-r1/OpenR1-Distill-7B](https://huggingface.co/open-r1/OpenR1-Distill-7B), with the following downstream performance:
| Model | AIME 2024 | MATH-500 | GPQA Diamond | LiveCodeBench v5 |
|-----------------------------|-----------|----------|--------------|------------------|
| OpenR1-Distill-7B | 52.7 | 89.0 | 52.8 | 39.4 |
| DeepSeek-R1-Distill-Qwen-7B | 51.3 | 93.5 | 52.4 | 37.4 |
You can adjust the YAML config to train on a different base model or dataset.
### GRPO
We use TRL's [vLLM backend](https://huggingface.co/docs/trl/speeding_up_training?vllm+examples=GRPO#vllm-for-fast-generation-in-online-methods) to scale training to large models across multiple nodes. For single-node training of smol models across 8 GPUs, first spin up the vLLM server to run on e.g. 1 GPU as follows:
We use TRL's [vLLM backend](https://huggingface.co/docs/trl/speeding_up_training?vllm+examples=GRPO#vllm-for-fast-generation-in-online-methods) to scale training to large models across multiple nodes. For single-node training of smol models across 8 GPUs, use `vllm_mode="colocate"` to run vLLM in the same process as the training script:
```shell
CUDA_VISIBLE_DEVICES=0 trl vllm-serve --model deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B
```
Once the server is up, run training on the remaining GPUs as follows:
```shell
CUDA_VISIBLE_DEVICES=1,2,3,4,5,6,7 ACCELERATE_LOG_LEVEL=info \
accelerate launch --config_file recipes/accelerate_configs/zero2.yaml --num_processes 7 \
src/open_r1/grpo.py --config recipes/DeepSeek-R1-Distill-Qwen-1.5B/grpo/config_demo.yaml
ACCELERATE_LOG_LEVEL=info \
accelerate launch --config_file recipes/accelerate_configs/zero3.yaml \
src/open_r1/grpo.py --config recipes/DeepSeek-R1-Distill-Qwen-1.5B/grpo/config_demo.yaml \
--vllm_mode colocate
```
> [!WARNING]
> The chat template used in the distilled DeepSeek models omits the contents of the reasoning block within the `<think>` and `</think>` tags. It also prefills the assistant response with `<think>` which interferes with the format reward function. To handle that, it is important to override the chat template as done in e.g. [recipes/DeepSeek-R1-Distill-Qwen-1.5B/grpo/config_demo.yaml](./recipes/DeepSeek-R1-Distill-Qwen-1.5B/grpo/config_demo.yaml).
To increase the throughput with data parallel on e.g. 2 GPUs, run:
```shell
CUDA_VISIBLE_DEVICES=0,1 trl vllm-serve --model deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B --data_parallel_size 2
```
Then run training on the remaining GPUs as follows:
```shell
CUDA_VISIBLE_DEVICES=2,3,4,5,6,7 ACCELERATE_LOG_LEVEL=info \
accelerate launch --config_file recipes/accelerate_configs/zero2.yaml --num_processes 6 \
src/open_r1/grpo.py --config recipes/DeepSeek-R1-Distill-Qwen-1.5B/grpo/config_demo.yaml
```
For larger models, use tensor parallelism:
```shell
CUDA_VISIBLE_DEVICES=0,1 trl vllm-serve --model deepseek-ai/DeepSeek-R1-Distill-Qwen-14B --tensor_parallel_size 2
```
For multi-node training on N+1 nodes, with 1 node running the vLLM server and N nodes running training, we provide an example Slurm script. For example, to run the above example on 1+1 nodes with data parallelism, run:
```shell
@ -245,6 +235,10 @@ sbatch --nodes=2 slurm/train.slurm --model Qwen2.5-1.5B-Instruct --task grpo --c
See the [Launching jobs on a Slurm cluster](#launching-jobs-on-a-slurm-cluster) section for more details.
#### GRPO dataset filtering
We provide support to filter datasets by generating and computing pass rate on veriable tasks, see this [README](scripts/pass_rate_filtering/README.md)
#### 👨‍💻 Training with a code interpreter
We provide a `code` reward function for executing code generated by the policy during training. Currently, this reward function targets code contests like [Codeforces](https://codeforces.com), where solutions are executed against a set of test cases and the overall success rate is returned as the final reward. To ensure safe execution, we support multiple sandbox providers:
@ -305,6 +299,7 @@ Make sure your dataset contains a `verification_info` column with the following
}
],
}
```
For example, to train a smol model on Python problems, start the vLLM server:
@ -346,17 +341,32 @@ morph_router_url: 1.2.3.4:8000
The port should match the one used when launching the router.
All training jobs can share the same router IP which will ensure parallel executions are properly managed.
#### IOI problems
#### Competitive Programming problems: IOI & CodeForces
We provide a `ioi_code_reward` reward function for executing problems from [IOI](https://hf.co/datasets/open-r1/ioi). You can use either [piston](https://github.com/engineer-man/piston) or Morph as your execution provider.
We provide `ioi_code_reward` and `cf_code_reward` reward functions for executing problems from [IOI](https://hf.co/datasets/open-r1/ioi) and [CodeForces](https://huggingface.co/datasets/open-r1/codeforces), respectively. You can use either [piston](https://github.com/engineer-man/piston) or Morph (currently IOI only) as your execution provider.
##### Piston
To use Piston:
1. Get piston workers running, see [slurm/piston/README.md](./slurm/piston/README.md)
2. Set your environment variable `PISTON_ENDPOINTS` to `slurm` or to a list of piston worker endpoints
For IOI:
3. In your configuration, use `ioi_provider: "piston"`
For CodeForces:
3. Download the generated (hard) test cases:
```
# change PATH_TO_SAVE_TESTCASES. Increase --max-workers according to your machine's capacity
huggingface-cli download open-r1/codeforces --repo-type=dataset --include='generated_tests/*.parquet' --max-workers=8 --local-dir PATH_TO_SAVE_TESTCASES
```
4. Save the path in .env:
```
CF_TESTS_FOLDER=PATH_TO_SAVE_TESTCASES
```
##### Morph
Morph is a cloud-based solution that provides sandboxed environments for running code. To use it:
@ -364,7 +374,10 @@ Morph is a cloud-based solution that provides sandboxed environments for running
2. Add your Morph API key to the `.env` file: `MORPH_API_KEY="your_key_here"`
3. In your configuration, use `ioi_provider: "morph"`
See the [example recipe](./recipes/Qwen2.5-1.5B-Instruct/grpo/config_demo_code_ioi.yaml) for how to use the reward function:
##### Example recipes
For IOI:
See the [example recipe](./recipes/Qwen2.5-1.5B-Instruct/grpo/config_demo_code_ioi.yaml) for how to use the IOI reward function:
```shell
ACCELERATE_LOG_LEVEL=info accelerate launch --config_file recipes/accelerate_configs/zero2.yaml \
@ -372,6 +385,12 @@ ACCELERATE_LOG_LEVEL=info accelerate launch --config_file recipes/accelerate_con
--config recipes/Qwen2.5-1.5B-Instruct/grpo/config_demo_code_ioi.yaml
```
For CodeForces:
```shell
sbatch --job-name=cf-grpo --nodes=2 slurm/train.slurm --model Qwen2.5-Coder-7B-Instruct --task grpo --config codeforces --accelerator zero3 --dp 8 --tp 1
```
### Launching jobs on a Slurm cluster
If you have access to a Slurm cluster, we provide a `slurm/train.slurm` script that will automatically queue training jobs for you. Here's how you can use it:
@ -383,7 +402,7 @@ sbatch --job-name=open_r1 --nodes=1 slurm/train.slurm --model {model_name} --tas
Here `{model_name}` and `{task}` are defined as above, while `{config_suffix}` refers to the specific config and `{accelerator}` refers to the choice of 🤗 Accelerate config in `recipes/accelerate_configs`. If you wish to override the default config parameters, you can provide them by appending a space-separated string like `'--arg1=value1 --arg2=value2'`. Here's a concrete example to run SFT on 1 node of 8 GPUs:
```shell
sbatch --job-name=open_r1 --nodes=1 slurm/train.slurm --model Qwen2.5-1.5B-Instruct --task sft --config demo --accelerator zero3
sbatch --job-name=open_r1 --nodes=1 slurm/train.slurm --model OpenR1-Distill-7B --task sft --config distill --accelerator zero3
```
You can scale the number of nodes by increasing the `--nodes` flag.
@ -397,6 +416,31 @@ sbatch --job-name=open_r1 --nodes=2 slurm/train.slurm --model Qwen2.5-1.5B-Instr
> [!NOTE]
> The configuration in `slurm/train.slurm` is optimised for the Hugging Face Compute Cluster and may require tweaking to be adapted to your own compute nodes.
### Customising the dataset mixture
To combine multiple datasets as a single training mixture, you can specify the `dataset_mixture` parameter in the YAML config file. Here's a template for how to do this:
```yaml
dataset_mixture:
datasets: # List of datasets to include in the mixture
- id: dataset_1 # Hub dataset ID
config: config_name_1 # Name of the dataset config
split: split_1 # Split to use from the dataset
columns: # Columns to keep
- column_1
- column_2
weight: 0.25 # Fraction of dataset to use
- id: dataset_2
config: config_name_2
split: split_2
columns:
- column_1
- column_2
weight: 0.5
seed: 42 # Seed for shuffling the combined dataset
test_split_size: 0.1 # Fraction of mixture to use for a test split
```
## Evaluating models
We use `lighteval` to evaluate models. For models which fit on a single GPU, run:
@ -710,7 +754,7 @@ sbatch slurm/generate.slurm \
### Data decontamination
Following [s1: Simple test-time scaling](https://arxiv.org/abs/2501.19393) the data can be decontaminated using the script at: [scripts/decontaminate.py](./scripts/decontaminate.py), which decontaminates a dataset using 8-grams and deduplicate the data. Sample run:
Following [s1: Simple test-time scaling](https://huggingface.co/papers/2501.19393) the data can be decontaminated using the script at: [scripts/decontaminate.py](./scripts/decontaminate.py), which decontaminates a dataset using 8-grams and deduplicate the data. Sample run:
```shell
python scripts/decontaminate.py \
@ -755,7 +799,7 @@ If you find this project is useful in your own work, please consider citing as f
@misc{openr1,
title = {Open R1: A fully open reproduction of DeepSeek-R1},
url = {https://github.com/huggingface/open-r1},
author = {Hugging Face},
author = {{Hugging Face}},
month = {January},
year = {2025}
}

View file

@ -0,0 +1,48 @@
# Config for 1 node of 8 x H100s (80GB)
# Model arguments
model_name_or_path: open-r1/Qwen2.5-Math-7B-RoPE-300k
model_revision: main
torch_dtype: bfloat16
attn_implementation: flash_attention_2
# Data training arguments
chat_template: "{%- if tools %}\n {{- '<|im_start|>system\\n' }}\n {%- if messages[0]['role'] == 'system' %}\n {{- messages[0]['content'] }}\n {%- else %}\n {{- 'You are Open-R1, a language model trained by Hugging Face to help users. Your role as an assistant involves thoroughly exploring questions through a systematic thinking process before providing the final precise and accurate solutions. This requires engaging in a comprehensive cycle of analysis, summarizing, exploration, reassessment, reflection, backtracing, and iteration to develop well-considered thinking process. Please structure your response into two main sections: Thought and Solution using the specified format: <think> Thought section </think> Solution section. In the Thought section, detail your reasoning process in steps. Each step should include detailed considerations such as analysing questions, summarizing relevant findings, brainstorming new ideas, verifying the accuracy of the current steps, refining any errors, and revisiting previous steps. In the Solution section, based on various attempts, explorations, and reflections from the Thought section, systematically present the final solution that you deem correct. The Solution section should be logical, accurate, and concise and detail necessary steps needed to reach the conclusion. Now, try to solve the following question through the above guidelines.' }}\n {%- endif %}\n {{- \"\\n\\n# Tools\\n\\nYou may call one or more functions to assist with the user query.\\n\\nYou are provided with function signatures within <tools></tools> XML tags:\\n<tools>\" }}\n {%- for tool in tools %}\n {{- \"\\n\" }}\n {{- tool | tojson }}\n {%- endfor %}\n {{- \"\\n</tools>\\n\\nFor each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:\\n<tool_call>\\n{\\\"name\\\": <function-name>, \\\"arguments\\\": <args-json-object>}\\n</tool_call><|im_end|>\\n\" }}\n{%- else %}\n {%- if messages[0]['role'] == 'system' %}\n {{- '<|im_start|>system\\n' + messages[0]['content'] + '<|im_end|>\\n' }}\n {%- else %}\n {{- '<|im_start|>system\\nYou are Open-R1, a language model trained by Hugging Face to help users. Your role as an assistant involves thoroughly exploring questions through a systematic thinking process before providing the final precise and accurate solutions. This requires engaging in a comprehensive cycle of analysis, summarizing, exploration, reassessment, reflection, backtracing, and iteration to develop well-considered thinking process. Please structure your response into two main sections: Thought and Solution using the specified format: <think> Thought section </think> Solution section. In the Thought section, detail your reasoning process in steps. Each step should include detailed considerations such as analysing questions, summarizing relevant findings, brainstorming new ideas, verifying the accuracy of the current steps, refining any errors, and revisiting previous steps. In the Solution section, based on various attempts, explorations, and reflections from the Thought section, systematically present the final solution that you deem correct. The Solution section should be logical, accurate, and concise and detail necessary steps needed to reach the conclusion. Now, try to solve the following question through the above guidelines.<|im_end|>\\n' }}\n {%- endif %}\n{%- endif %}\n{%- for message in messages %}\n {%- if (message.role == \"user\") or (message.role == \"system\" and not loop.first) or (message.role == \"assistant\" and not message.tool_calls) %}\n {{- '<|im_start|>' + message.role + '\\n' + message.content + '<|im_end|>' + '\\n' }}\n {%- elif message.role == \"assistant\" %}\n {{- '<|im_start|>' + message.role }}\n {%- if message.content %}\n {{- '\\n' + message.content }}\n {%- endif %}\n {%- for tool_call in message.tool_calls %}\n {%- if tool_call.function is defined %}\n {%- set tool_call = tool_call.function %}\n {%- endif %}\n {{- '\\n<tool_call>\\n{\"name\": \"' }}\n {{- tool_call.name }}\n {{- '\", \"arguments\": ' }}\n {{- tool_call.arguments | tojson }}\n {{- '}\\n</tool_call>' }}\n {%- endfor %}\n {{- '<|im_end|>\\n' }}\n {%- elif message.role == \"tool\" %}\n {%- if (loop.index0 == 0) or (messages[loop.index0 - 1].role != \"tool\") %}\n {{- '<|im_start|>user' }}\n {%- endif %}\n {{- '\\n<tool_response>\\n' }}\n {{- message.content }}\n {{- '\\n</tool_response>' }}\n {%- if loop.last or (messages[loop.index0 + 1].role != \"tool\") %}\n {{- '<|im_end|>\\n' }}\n {%- endif %}\n {%- endif %}\n{%- endfor %}\n{%- if add_generation_prompt %}\n {{- '<|im_start|>assistant\\n' }}\n{%- endif %}\n"
dataset_name: open-r1/Mixture-of-Thoughts
dataset_config: all
dataset_num_proc: 12
eos_token: <|im_end|>
# SFT trainer config
bf16: true
do_eval: false
eval_strategy: 'no'
gradient_accumulation_steps: 8
gradient_checkpointing: true
gradient_checkpointing_kwargs:
use_reentrant: false
hub_model_id: OpenR1-Distill-7B
hub_strategy: every_save
learning_rate: 4.0e-05
log_level: info
logging_steps: 1
logging_strategy: steps
lr_scheduler_type: cosine_with_min_lr
lr_scheduler_kwargs:
min_lr_rate: 0.1
packing: false
max_grad_norm: 0.2
max_length: 32768
max_steps: -1
num_train_epochs: 5
output_dir: data/OpenR1-Distill-7B
overwrite_output_dir: true
per_device_eval_batch_size: 1
per_device_train_batch_size: 2
push_to_hub: true
report_to:
- wandb
save_strategy: epoch
save_total_limit: 1
seed: 42
use_liger_kernel: true
warmup_ratio: 0.03

View file

@ -1,48 +0,0 @@
# Model arguments
# You need to download the model and manually change the rope to 300k and max_position_embeddings to 32768
# the config file should match https://huggingface.co/open-r1/OpenR1-Qwen-7B/blob/main/config.json
model_name_or_path: Qwen/Qwen2.5-Math-7B-Instruct
model_revision: main
torch_dtype: bfloat16
attn_implementation: sdpa
# Data training arguments
dataset_name: open-r1/OpenR1-Math-220k
dataset_num_proc: 48
#SFT hyperparam
max_length: 32768
weight_decay: 0.0001
optim: adamw_torch
lr_scheduler_type: linear
warmup_ratio: 0.1
learning_rate: 5.0e-05
gradient_accumulation_steps: 2
per_device_eval_batch_size: 1
per_device_train_batch_size: 1
# SFT trainer config
max_steps: -1
num_train_epochs: 3
bf16: true
do_eval: false
use_liger_kernel: true
eval_strategy: 'no'
gradient_checkpointing: true
gradient_checkpointing_kwargs:
use_reentrant: false
hub_model_id: OpenR1-Qwen-7B-SFT
hub_strategy: every_save
log_level: info
logging_steps: 5
logging_strategy: steps
packing: false
output_dir: data/OpenR1-Qwen-7B-SFT
overwrite_output_dir: true
push_to_hub: true
report_to:
- wandb
save_strategy: "steps"
save_steps: 500
save_total_limit: 1
seed: 42

View file

@ -1,44 +0,0 @@
# Model arguments
model_name_or_path: Qwen/Qwen2.5-1.5B-Instruct
model_revision: main
torch_dtype: bfloat16
attn_implementation: flash_attention_2
# Data training arguments
dataset_name: open-r1/OpenR1-Math-220k
dataset_num_proc: 48
# SFT trainer config
bf16: true
do_eval: false
eval_strategy: 'no'
gradient_accumulation_steps: 1
gradient_checkpointing: true
gradient_checkpointing_kwargs:
use_reentrant: false
hub_model_id: Qwen2.5-1.5B-Open-R1-Distill
hub_strategy: every_save
learning_rate: 5.0e-05
log_level: info
logging_steps: 5
logging_strategy: steps
lr_scheduler_type: cosine_with_min_lr
lr_scheduler_kwargs:
min_lr_rate: 0.1
packing: false
max_length: 16384
max_steps: -1
num_train_epochs: 1
output_dir: data/Qwen2.5-1.5B-Open-R1-Distill
overwrite_output_dir: true
per_device_eval_batch_size: 16
per_device_train_batch_size: 16
push_to_hub: true
report_to:
- wandb
save_strategy: "steps"
save_steps: 100
save_total_limit: 1
seed: 42
use_liger_kernel: true
warmup_ratio: 0.05

View file

@ -0,0 +1,80 @@
# Model arguments
model_name_or_path: Qwen/Qwen2.5-Coder-7B-Instruct
model_revision: main
torch_dtype: bfloat16
attn_implementation: flash_attention_2
# Data training arguments
dataset_name: open-r1/codeforces
dataset_prompt_column: prompt
dataset_config: verifiable-prompts
dataset_test_split: test
dataset_train_split: train
system_prompt: "You are a helpful AI Assistant that provides well-reasoned and detailed responses. You first think about the reasoning process as an internal monologue and then provide the user with the answer. Respond in the following format: <think>\n...\n</think>\n<answer>\n...\n</answer>"
# GRPO trainer config
callbacks:
- push_to_hub_revision
benchmarks:
- lcb_v4
beta: 0.0
loss_type: dr_grpo
scale_rewards: false
bf16: true
do_eval: false
eval_strategy: "no"
use_vllm: true
vllm_device: auto
vllm_gpu_memory_utilization: 0.7
gradient_accumulation_steps: 32
gradient_checkpointing: true
gradient_checkpointing_kwargs:
use_reentrant: false
hub_model_id: open-r1/Qwen2.5-Coder-7B-Instruct-Codeforces-GRPO
hub_model_revision: v01.00
hub_strategy: every_save
learning_rate: 1.0e-06
log_completions: true
log_level: info
logging_first_step: true
logging_steps: 1
logging_strategy: steps
lr_scheduler_type: constant_with_warmup
max_grad_norm: 0.2
max_prompt_length: 2000
max_completion_length: 8192
max_steps: -1
num_generations: 16
# aiming for 1k optimization steps
# total_samples_per_batch = num_gpus * grad_accumulation_steps * per_device_batch_size = 8 * 32 * 4 = 1024
# unique_prompts_per_batch = total_samples_per_batch / num_generations = 1024 / 16 = 64
# #dataset ~= 16k (8k * 2, for python and cpp)
# global_steps_per_epoch = #dataset / unique_prompts_per_batch = 16k / 64 ~= 250
# epochs_for_1k_steps = 1000/250 = 4 epochs
num_train_epochs: 4
output_dir: data/Qwen2.5-Coder-7B-Instruct-Codeforces-GRPO_v01.00
overwrite_output_dir: true
per_device_train_batch_size: 4
push_to_hub: true
report_to:
- wandb
reward_funcs:
- cf_code
- code_format
reward_weights:
- 1.0
- 0.1
save_strategy: "steps"
save_steps: 0.05
save_total_limit: 1
seed: 42
temperature: 0.7
wandb_entity: huggingface
wandb_project: open-r1
warmup_ratio: 0.1
mask_truncated_completions: true
# for each generation, evaluate these many test cases in parallel, then check if any of them failed (0 score): if so stop evaluating
# otherwise continue with the next batch of test cases. Useful to avoid overloading the eval server + save time on wrong solutions
code_eval_test_batch_size: -1
code_eval_scoring_mode: weighted_sum

View file

@ -1,52 +0,0 @@
# Model arguments
model_name_or_path: Qwen/Qwen2.5-Math-7B
model_revision: main
torch_dtype: bfloat16
attn_implementation: flash_attention_2
# Data training arguments
dataset_name: DigitalLearningGmbH/MATH-lighteval
dataset_config: default
dataset_prompt_column: problem
system_prompt: "You are a helpful AI Assistant, designed to provided well-reasoned and detailed responses. You FIRST think about the reasoning process as an internal monologue and then provide the user with the answer. The reasoning process MUST BE enclosed within <think> and </think> tags."
# GRPO trainer config
bf16: true
use_vllm: true
do_eval: true
eval_strategy: steps
eval_steps: 100
gradient_accumulation_steps: 8
gradient_checkpointing: true
gradient_checkpointing_kwargs:
use_reentrant: false
hub_model_id: Qwen-2.5-7B-Simple-RL
hub_strategy: every_save
learning_rate: 3.0e-06
log_completions: true
log_level: info
logging_first_step: true
logging_steps: 5
logging_strategy: steps
lr_scheduler_type: cosine
max_prompt_length: 512
max_completion_length: 1024
max_steps: -1
num_generations: 7
num_train_epochs: 1
output_dir: data/Qwen-2.5-7B-Simple-RL
overwrite_output_dir: true
per_device_eval_batch_size: 16
per_device_train_batch_size: 16
push_to_hub: true
report_to:
- wandb
reward_funcs:
- accuracy
- format
reward_weights:
- 1.0
- 1.0
save_strategy: "no"
seed: 42
warmup_ratio: 0.1

View file

@ -1,5 +1,13 @@
# Post-training recipes
## OpenR1 Distill 7B
To train the OpenR1 Distill 7B model, run:
```
sbatch --nodes=1 slurm/train.slurm --model OpenR1-Distill-7B --task sft --config distill --accelerator zero3
```
## OlympicCoder
To train the OlympicCoder models, run:

View file

@ -0,0 +1,28 @@
# Model arguments
model_name_or_path: deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B
model_revision: main
torch_dtype: bfloat16
attn_implementation: flash_attention_2
# Data training arguments
# We edit the DeepSeek chat template to ensure (a) the reasoning block within <think> and </think> is included in the completion and (b) the <think> tag is not part of the prefill so that the format reward works
chat_template: "{% if not add_generation_prompt is defined %}{% set add_generation_prompt = false %}{% endif %}{% set ns = namespace(is_first=false, is_tool=false, is_output_first=true, system_prompt='') %}{%- for message in messages %}{%- if message['role'] == 'system' %}{% set ns.system_prompt = message['content'] %}{%- endif %}{%- endfor %}{{bos_token}}{{ns.system_prompt}}{%- for message in messages %}{%- if message['role'] == 'user' %}{%- set ns.is_tool = false -%}{{'<User>' + message['content']}}{%- endif %}{%- if message['role'] == 'assistant' and message['content'] is none %}{%- set ns.is_tool = false -%}{%- for tool in message['tool_calls']%}{%- if not ns.is_first %}{{'<Assistant><tool▁calls▁begin><tool▁call▁begin>' + tool['type'] + '<tool▁sep>' + tool['function']['name'] + '\\n' + '```json' + '\\n' + tool['function']['arguments'] + '\\n' + '```' + '<tool▁call▁end>'}}{%- set ns.is_first = true -%}{%- else %}{{'\\n' + '<tool▁call▁begin>' + tool['type'] + '<tool▁sep>' + tool['function']['name'] + '\\n' + '```json' + '\\n' + tool['function']['arguments'] + '\\n' + '```' + '<tool▁call▁end>'}}{{'<tool▁calls▁end><end▁of▁sentence>'}}{%- endif %}{%- endfor %}{%- endif %}{%- if message['role'] == 'assistant' and message['content'] is not none %}{%- if ns.is_tool %}{{'<tool▁outputs▁end>' + message['content'] + '<end▁of▁sentence>'}}{%- set ns.is_tool = false -%}{%- else %}{% set content = message['content'] %}{{'<Assistant>' + content + '<end▁of▁sentence>'}}{%- endif %}{%- endif %}{%- if message['role'] == 'tool' %}{%- set ns.is_tool = true -%}{%- if ns.is_output_first %}{{'<tool▁outputs▁begin><tool▁output▁begin>' + message['content'] + '<tool▁output▁end>'}}{%- set ns.is_output_first = false %}{%- else %}{{'\\n<tool▁output▁begin>' + message['content'] + '<tool▁output▁end>'}}{%- endif %}{%- endif %}{%- endfor -%}{% if ns.is_tool %}{{'<tool▁outputs▁end>'}}{% endif %}{% if add_generation_prompt and not ns.is_tool %}{{'<Assistant>'}}{% endif %}"
dataset_name: open-r1/OpenR1-Math-220k
dataset_prompt_column: problem
system_prompt: "You are a helpful AI Assistant that provides well-reasoned and detailed responses. You first think about the reasoning process as an internal monologue and then provide the user with the answer. Respond in the following format: <think>\n...\n</think>\n<answer>\n...\n</answer>"
# Generation arguments
max_completion_length: 2048
num_generations: 8
temperature: 0.7
top_p: 0.95
# Reward func arguments
reward_funcs:
- accuracy
reward_weights:
- 1.0
# Filtering arguments. Samples with a pass rate outside the interval `pass_rate_min < x < pass_rate_max` will be filtered.
pass_rate_min: 0.2
pass_rate_max: 0.8

View file

@ -0,0 +1,28 @@
# Model arguments
model_name_or_path: open-r1/R1-Distill-Qwen-Math-7B
model_revision: v03.00-step-000008190
torch_dtype: bfloat16
attn_implementation: flash_attention_2
# Data training arguments
# We edit the DeepSeek chat template to ensure (a) the reasoning block within <think> and </think> is included in the completion and (b) the <think> tag is not part of the prefill so that the format reward works
dataset_name: open-r1/DAPO-Math-17k-Processed
dataset_config: all
dataset_split: train
# Generation arguments
max_completion_length: 32000
num_generations: 8
temperature: 1.0
# Reward func arguments
reward_funcs:
- accuracy
reward_weights:
- 1.0
# Filtering arguments. Samples with mean reward outside of low / high will be filtered
pass_rate_min: 0.1
pass_rate_max: 0.6
output_dataset_name: open-r1/DAPO-Math-17k-Processed-R1-Distill-Qwen-Math-7B-v03.00-step-000008190-filter

View file

@ -0,0 +1,26 @@
# Model arguments
model_name_or_path: open-r1/R1-Distill-Qwen-Math-7B-Merges
model_revision: v00.00-step-000003660_v01.00-step-000002600_weights-0.50-0.50
torch_dtype: bfloat16
attn_implementation: flash_attention_2
# Data training arguments
# We edit the DeepSeek chat template to ensure (a) the reasoning block within <think> and </think> is included in the completion and (b) the <think> tag is not part of the prefill so that the format reward works
dataset_name: open-r1/verifiable-coding-problems-python_decontaminated-tested-shuffled
dataset_prompt_column: problem
# Generation arguments
max_completion_length: 16000
num_generations: 8
temperature: 0.7
# Reward func arguments
reward_funcs:
- binary_code
reward_weights:
- 1.0
e2b_router_url: ip-10-53-85-92:8000
# Filtering arguments. Samples with mean reward outside of low / high will be filtered
pass_rate_min: 0.1
pass_rate_max: 0.6

View file

@ -15,7 +15,7 @@
# limitations under the License.
"""
This script is used to decontaminate a dataset by checking for n-gram overlap with other datasets.
It uses the same approach presented in https://arxiv.org/abs/2501.19393,
It uses the same approach presented in https://huggingface.co/papers/2501.19393,
as found in: https://github.com/simplescaling/s1/blob/main/data/decontaminate_util.py
Usage:

View file

@ -0,0 +1,36 @@
# Pass rate filtering
We provide support to filter datasets by generating and computing pass rate on veriable tasks
See `scripts/pass_rate_filtering/compute_pass_rate.py` and `scripts/pass_rate_filtering/launch_filtering.sh` (hardcoded for DAPO at the moment)
By default the script chunks the dataset, merge can be run using the following snippet (example for DAPO) :
from datasets import load_dataset, concatenate_datasets
name = "open-r1/DAPO-Math-17k-Processed-R1-Distill-Qwen-Math-7B-Merges-v00.02-v01.02-0.3-0.7-filter"
```python
gen_datasets = []
filt_datasets = []
for start in range(0,17400,200):
end = start + 200
if start == 17200:
end = 17398
gen_config_name = f"gen-{start}-{end}"
gen_dataset = load_dataset(name, gen_config_name, revision="gen", split="train")
gen_datasets.append(gen_dataset)
filt_config_name = f"filt-0.1-0.6-{start}-{end}"
filt_dataset = load_dataset(name, filt_config_name, revision="pass_rate", split="train")
filt_datasets.append(filt_dataset)
gen_dataset = concatenate_datasets(gen_datasets)
gen_dataset.push_to_hub(name, config_name="gen", split="train")
print(gen_dataset)
filt_dataset = concatenate_datasets(filt_datasets)
filt_dataset.push_to_hub(name, config_name="default", split="train")
print(filt_dataset)
```

View file

@ -0,0 +1,205 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# example usage python scripts/filter_dataset.py --config recipes/dataset_filtering/config_demo.yaml
import logging
from dataclasses import dataclass
from git import Optional
import torch
import sys
import datasets
import transformers
from datasets import load_dataset
from transformers import set_seed
from open_r1.configs import GRPOConfig, GRPOScriptArguments
from open_r1.rewards import get_reward_funcs
from open_r1.utils import get_tokenizer
from trl import ModelConfig, TrlParser
from trl.data_utils import apply_chat_template
from vllm import LLM, SamplingParams
logger = logging.getLogger(__name__)
@dataclass
class PassRateScriptArguments(GRPOScriptArguments):
# we can be lazy and just use the same script args as GRPO
output_dataset_name: Optional[str] = None
pass_rate_min: float = 0.1
pass_rate_max: float = 0.9
dataset_start_index: Optional[int] = None
dataset_end_index: Optional[int] = None
dataset_split: str = "train"
def main(script_args, training_args, model_args):
# Set seed for reproducibility
set_seed(training_args.seed)
###############
# Setup logging
###############
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
log_level = training_args.get_process_log_level()
logger.setLevel(log_level)
datasets.utils.logging.set_verbosity(log_level)
transformers.utils.logging.set_verbosity(log_level)
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()
logger.info(f"Model parameters {model_args}")
logger.info(f"Script parameters {script_args}")
logger.info(f"Training parameters {training_args}")
# Load the dataset
dataset = load_dataset(script_args.dataset_name, name=script_args.dataset_config, split=script_args.dataset_split)
if script_args.dataset_start_index is not None and script_args.dataset_end_index is not None:
dataset = dataset.select(range(script_args.dataset_start_index, script_args.dataset_end_index))
# Get reward functions from the registry
reward_funcs = get_reward_funcs(script_args)
# Format into conversation
def make_conversation(example, prompt_column: str = script_args.dataset_prompt_column):
example["prompt_backup"] = example[prompt_column]
prompt = []
if training_args.system_prompt is not None:
prompt.append({"role": "system", "content": training_args.system_prompt})
if prompt_column not in example:
raise ValueError(f"Dataset Question Field Error: {prompt_column} is not supported.")
prompt.append({"role": "user", "content": example[prompt_column]})
return {"prompt": prompt}
dataset = dataset.map(make_conversation)
tokenizer = get_tokenizer(model_args, training_args)
if "messages" in dataset.column_names:
dataset = dataset.remove_columns("messages")
dataset = dataset.map(apply_chat_template, fn_kwargs={"tokenizer": tokenizer})
llm = LLM(
model=model_args.model_name_or_path,
revision=model_args.model_revision,
trust_remote_code=model_args.trust_remote_code,
)
sampling_params=SamplingParams(
temperature=training_args.temperature,
top_p=training_args.top_p,
top_k=training_args.top_k,
n=training_args.num_generations,
max_tokens=training_args.max_completion_length,
)
def batch_score(examples):
prompts = examples["prompt"]
outputs = llm.generate(
prompts,
sampling_params=sampling_params,
use_tqdm=False,
)
repeated_prompts = []
reward_completions = []
grouped_completions = []
for output in outputs:
prompt = output.prompt
group = []
for completion in output.outputs:
text = completion.text
group.append(text)
message = [{"role": "assistant", "content": text}]
repeated_prompts.append(prompt)
reward_completions.append(message)
grouped_completions.append(group)
def repeat_each_element_k_times(list_to_repeat: list, k: int) -> list:
return [element for item in list_to_repeat for element in [item] * k]
rewards_per_func = torch.zeros(len(repeated_prompts), len(reward_funcs))
for i, reward_func in enumerate(reward_funcs):
keys = [key for key in examples.data.keys() if key not in ["prompt", "completion"]]
reward_kwargs = {key: repeat_each_element_k_times(examples[key], training_args.num_generations) for key in keys}
output_reward_func = reward_func(prompts=repeated_prompts, completions=reward_completions, **reward_kwargs)
# Convert None values to NaN
output_reward_func = [reward if reward is not None else torch.nan for reward in output_reward_func]
rewards_per_func[:, i] = torch.tensor(output_reward_func, dtype=torch.float32)
reshaped_rewards = rewards_per_func.view(-1, training_args.num_generations)
examples["pass_rate_generations"] = grouped_completions
examples["pass_rate_rewards"] = reshaped_rewards.tolist()
return examples
dataset = dataset.map(batch_score, batched=True, batch_size=64)
# we need to restore the prompt for the final dataset
def restore_prompt(example):
example["prompt"] = example["prompt_backup"]
return example
dataset = dataset.map(restore_prompt)
dataset = dataset.remove_columns("prompt_backup")
if script_args.output_dataset_name is not None:
output_dataset_name = script_args.output_dataset_name
else:
model_name = model_args.model_name_or_path
if "/" in model_name:
model_name = model_name.split("/")[-1]
model_revision = model_args.model_revision
output_dataset_name = f"{script_args.dataset_name}-{model_name}-{model_revision}-gen"
config_name="default"
filtered_config_name = f"filt-{script_args.pass_rate_min}-{script_args.pass_rate_max}"
if script_args.dataset_start_index is not None and script_args.dataset_end_index is not None:
config_name = f"gen-{script_args.dataset_start_index}-{script_args.dataset_end_index}"
filtered_config_name = f"{filtered_config_name}-{script_args.dataset_start_index}-{script_args.dataset_end_index}"
dataset.push_to_hub(output_dataset_name, config_name=config_name, revision="gen")
def filter_func(example):
rewards = example["pass_rate_rewards"]
# get the mean of the rewards that are not None
mean_reward = torch.nanmean(torch.tensor(rewards, dtype=torch.float32))
return script_args.pass_rate_min < mean_reward < script_args.pass_rate_max
logger.info(f"Filtering dataset with low reward threshold {script_args.pass_rate_min} and high reward threshold {script_args.pass_rate_max}")
logger.info(f"Dataset size before filtering: {dataset}")
dataset = dataset.filter(filter_func)
logger.info(f"Dataset size after filtering: {dataset}")
dataset.push_to_hub(output_dataset_name, config_name=filtered_config_name, revision="pass_rate")
if __name__ == "__main__":
parser = TrlParser((PassRateScriptArguments, GRPOConfig, ModelConfig))
script_args, training_args, model_args = parser.parse_args_and_config()
main(script_args, training_args, model_args)

View file

@ -0,0 +1,15 @@
# a bash foor loop from 0 to 17,400 in chunks of 200
for i in {0..17000..200}
do
START=$i
END=$((i + 200))
echo "Processing chunk from $START to $END"
# Submit the job to SLURM
sbatch slurm/compute_pass_rate.slurm recipes/dataset_filtering/filter_dapo.yaml $START $END
done
sbatch slurm/compute_pass_rate.slurm recipes/dataset_filtering/filter_dapo.yaml 17200 17398

View file

@ -44,7 +44,7 @@ _deps = [
"accelerate==1.4.0",
"bitsandbytes>=0.43.0",
"datasets>=3.2.0",
"deepspeed==0.16.7",
"deepspeed==0.16.8",
"distilabel[vllm,ray,openai]>=1.5.2",
"e2b-code-interpreter>=1.0.5",
"einops>=0.8.0",
@ -55,8 +55,8 @@ _deps = [
"jieba", # Needed for Chinese language support
"langdetect", # Needed for LightEval's extended tasks
"latex2sympy2_extended>=1.0.6",
"liger-kernel>=0.5.6",
"lighteval @ git+https://github.com/huggingface/lighteval.git@d50bc3072b8814656633400a1850c500c6aa2e39",
"liger-kernel>=0.5.10",
"lighteval @ git+https://github.com/huggingface/lighteval.git@d3da6b9bbf38104c8b5e1acc86f83541f9a502d1", # Critical bug fix for tokenizer revisions: https://github.com/huggingface/lighteval/pull/721
"math-verify==0.5.2", # Used for math verification in grpo
"morphcloud==0.1.67",
"packaging>=23.0",
@ -68,9 +68,12 @@ _deps = [
"safetensors>=0.3.3",
"sentencepiece>=0.1.99",
"torch==2.6.0",
"transformers @ git+https://github.com/huggingface/transformers.git@acdbe627e323dbc822f21499fead789b439cf45b", # Fix DeepSpeed x vLLM conflict: https://github.com/huggingface/transformers/pull/37755
"trl[vllm] @ git+https://github.com/huggingface/trl.git@1bca49515ecd5b85d16e68c42c76670e252e19f1", # Fix DeepSpeed x vLLM conflict: https://github.com/huggingface/trl/pull/3351
"transformers==4.52.3",
"trl[vllm]==0.18.0",
"wandb>=0.19.1",
"async-lru>=2.0.5",
"aiofiles>=24.1.0",
"pandas>=2.2.3",
]
# this is a lookup table with items like:
@ -90,7 +93,7 @@ extras = {}
extras["tests"] = deps_list("pytest", "parameterized", "math-verify", "jieba")
extras["torch"] = deps_list("torch")
extras["quality"] = deps_list("ruff", "isort", "flake8")
extras["code"] = deps_list("e2b-code-interpreter", "python-dotenv", "morphcloud", "jieba")
extras["code"] = deps_list("e2b-code-interpreter", "python-dotenv", "morphcloud", "jieba", "pandas", "aiofiles")
extras["eval"] = deps_list("lighteval", "math-verify")
extras["dev"] = extras["quality"] + extras["tests"] + extras["eval"] + extras["code"]
@ -113,6 +116,7 @@ install_requires = [
deps["transformers"],
deps["trl"],
deps["wandb"],
deps["async-lru"],
]
setup(

View file

@ -0,0 +1,20 @@
#!/bin/bash
#SBATCH --job-name=open-r1-compute-pass-rate
#SBATCH --partition=hopper-prod
#SBATCH --qos=normal
#SBATCH --nodes=1
#SBATCH --gpus-per-node=1
#SBATCH --output=./logs/%x-%j.out
#SBATCH --error=./logs/%x-%j.err
#SBATCH --time=01-00:00:00
#SBATCH --requeue
# example usage: sbatch slurm/dataset_filter.slurm recipes/dataset_filtering/filter_dapo.yaml 0 500
set -x -e
source ~/.bashrc
source openr1/bin/activate
python scripts/pass_rate_filtering/compute_pass_rate.py --config $1 --dataset_start_index $2 --dataset_end_index $3

View file

@ -12,6 +12,10 @@
# Be ye warned this may not work on other clusters!
module load cuda/12.4
# Refresh Weka on h4 cache
echo "Refreshing Weka filesystem..."
find -L /fsx/h4/ -type f | xargs -d '\n' -r -n512 -P64 weka fs tier fetch
# Needed for vLLM
export VLLM_WORKER_MULTIPROC_METHOD=spawn

View file

@ -17,10 +17,17 @@ slurm/piston/launch_piston_workers.sh 1
```
2. Assuming it's running on `ip-10-53-86-146:1234`, send the package install request:
For IOI:
```bash
curl -X POST http://ip-10-53-86-146:1234/api/v2/packages -H "Content-Type: application/json" -d '{"language": "cms_ioi", "version": "1.0.0"}'
```
For CodeForces:
```bash
curl -X POST http://ip-10-53-86-146:1234/api/v2/packages -H "Content-Type: application/json" -d '{"language": "codeforces", "version": "1.0.0"}'
```
3. You can now launch more workers and due to the shared mounted packages directory, they should already have the package installed.
To have the main script find the workers automatically, you can export the following environment variable:
@ -32,6 +39,7 @@ Alternatively your can add `PISTON_ENDPOINTS=slurm` to your .env file.
You can also change `PISTON_MAX_REQUESTS_PER_ENDPOINT`, which tries to limit how many simultaneous requests each worker will handle (1 by default). Keep in mind that this is a local limit and in distributed setups, as there is no global limit, workers might sometimes be overwhelmed when some processes hit the same worker.
If you would like to adapt the code to run without piston, please see the [ioi repo](https://github.com/huggingface/ioi).
For CodeForces, you should implement the [`run`](https://github.com/guipenedo/piston/blob/master/packages/codeforces/1.0.0/run) and [`compile`](https://github.com/guipenedo/piston/blob/master/packages/codeforces/1.0.0/compile) scripts.
# Piston workers (local docker)
This will launch a single worker in a docker container. Consider launching multiple workers for better scalability. Replace 2000 with the port you want to use.
@ -57,10 +65,16 @@ docker run -d \
```
Install the package:
For IOI:
```bash
curl -X POST http://localhost:2000/api/v2/packages -H "Content-Type: application/json" -d '{"language": "cms_ioi", "version": "1.0.0"}'
```
For CodeForces:
```bash
curl -X POST http://localhost:2000/api/v2/packages -H "Content-Type: application/json" -d '{"language": "codeforces", "version": "1.0.0"}'
```
Remember to set `PISTON_ENDPOINTS`:
```bash
export PISTON_ENDPOINTS=http://localhost:2000/api/v2,http://localhost:2001/api/v2,http://localhost:2002/api/v2

View file

@ -32,6 +32,10 @@ source openr1/bin/activate
START_TIME=$(date +%s)
echo "START TIME: $(date)"
# Refresh Weka on h4 cache
echo "Refreshing Weka filesystem..."
find -L /fsx/h4/ -type f | xargs -d '\n' -r -n512 -P64 weka fs tier fetch
# Default values
MODEL=""
TASK=""
@ -167,7 +171,7 @@ SRUN_ARGS=" \
--ntasks=$NUM_NODES \
--nodelist=$NODELIST
"
clear; srun $SRUN_ARGS bash -c "$LAUNCHER $CMD" 2>&1
srun $SRUN_ARGS bash -c "$LAUNCHER $CMD" 2>&1
END_TIME=$(date +%s)
echo "END TIME: $(date)"
@ -175,4 +179,4 @@ ELAPSED_SECONDS=$((END_TIME - START_TIME))
HOURS=$((ELAPSED_SECONDS / 3600))
MINUTES=$(( (ELAPSED_SECONDS % 3600) / 60 ))
SECONDS=$((ELAPSED_SECONDS % 60))
echo "TOTAL JOB TIME: ${HOURS}h ${MINUTES}m ${SECONDS}s (${ELAPSED_SECONDS} seconds)"
echo "TOTAL JOB TIME: ${HOURS}h ${MINUTES}m ${SECONDS}s (${ELAPSED_SECONDS} seconds)"

View file

@ -14,11 +14,112 @@
# limitations under the License.
from dataclasses import dataclass, field
from typing import Optional
from typing import Any, Literal, Optional
import trl
@dataclass
class DatasetConfig:
"""Configuration for a dataset in a mixture."""
id: str
config: Optional[str] = None
split: str = "train"
columns: Optional[list[str]] = None
weight: Optional[float] = None
@dataclass
class DatasetMixtureConfig:
"""Configuration for a mixture of datasets."""
datasets: list[DatasetConfig]
seed: int = 0
test_split_size: Optional[float] = None
@dataclass
class ScriptArguments(trl.ScriptArguments):
"""
Extended version of ScriptArguments with support for dataset mixtures.
Args:
dataset_mixture (`dict[str, Any]` or `None`, *optional*, defaults to `None`):
Configuration for creating dataset mixtures with advanced options.
Format:
dataset_mixture:
datasets:
- id: dataset_id1
config: config_name
columns:
- col1
- col2
weight: 0.5
- id: dataset_id2
config: config_name
columns:
- col1
- col2
weight: 0.5
seed: 42
test_split_size: 0.1
"""
# Override the dataset_name to make it optional
dataset_name: Optional[str] = field(
default=None, metadata={"help": "Dataset name. Can be omitted if using dataset_mixture."}
)
dataset_mixture: Optional[dict[str, Any]] = field(
default=None,
metadata={"help": "Configuration for creating dataset mixtures with advanced options like shuffling."},
)
def __post_init__(self):
if self.dataset_name is None and self.dataset_mixture is None:
raise ValueError("Either `dataset_name` or `dataset_mixture` must be provided")
if self.dataset_mixture is not None:
if not isinstance(self.dataset_mixture, dict) or "datasets" not in self.dataset_mixture:
raise ValueError(
"dataset_mixture must be a dictionary with a 'datasets' key. "
"Expected format: {'datasets': [...], 'seed': int}"
)
datasets_list = []
datasets_data = self.dataset_mixture.get("datasets", [])
if isinstance(datasets_data, list):
for dataset_config in datasets_data:
datasets_list.append(
DatasetConfig(
id=dataset_config.get("id"),
config=dataset_config.get("config"),
split=dataset_config.get("split", "train"),
columns=dataset_config.get("columns"),
weight=dataset_config.get("weight", 1.0),
)
)
else:
raise ValueError("'datasets' must be a list of dataset configurations")
self.dataset_mixture = DatasetMixtureConfig(
datasets=datasets_list,
seed=self.dataset_mixture.get("seed", 0),
test_split_size=self.dataset_mixture.get("test_split_size", None),
)
# Check that column names are consistent across all dataset configs
columns_sets = [set(dataset.columns) for dataset in datasets_list if dataset.columns is not None]
if columns_sets:
first_columns = columns_sets[0]
if not all(columns == first_columns for columns in columns_sets):
raise ValueError(
"Column names must be consistent across all dataset configurations in a mixture. "
f"Found different column sets: {[list(cols) for cols in columns_sets]}"
)
# TODO: add the shared options with a mixin to reduce code duplication
@dataclass
class GRPOConfig(trl.GRPOConfig):
@ -35,15 +136,22 @@ class GRPOConfig(trl.GRPOConfig):
metadata={"help": "The callbacks to run during training."},
)
chat_template: Optional[str] = field(default=None, metadata={"help": "The chat template to use."})
hub_model_revision: Optional[str] = field(
default="main", metadata={"help": "The Hub model branch to push the model to."}
)
num_completions_to_print: int = field(default=0, metadata={"help": "Number of completions to print."})
overwrite_hub_revision: bool = field(default=False, metadata={"help": "Whether to overwrite the Hub revision."})
push_to_hub_revision: bool = field(default=False, metadata={"help": "Whether to push to a Hub revision/branch."})
system_prompt: Optional[str] = field(
default=None,
metadata={"help": "The optional system prompt to use."},
)
hub_model_revision: Optional[str] = field(
default="main", metadata={"help": "The Hub model branch to push the model to."}
wandb_log_unique_prompts: bool = field(
default=True,
metadata={
"help": ("Whether to log the unique prompts to wandb. This will create a new run for each unique prompt.")
},
)
overwrite_hub_revision: bool = field(default=False, metadata={"help": "Whether to overwrite the Hub revision."})
push_to_hub_revision: bool = field(default=False, metadata={"help": "Whether to push to a Hub revision/branch."})
wandb_entity: Optional[str] = field(
default=None,
metadata={"help": ("The entity to store runs under.")},
@ -98,7 +206,7 @@ class SFTConfig(trl.SFTConfig):
@dataclass
class GRPOScriptArguments(trl.ScriptArguments):
class GRPOScriptArguments(ScriptArguments):
"""
Script arguments for the GRPO training script.
@ -159,6 +267,7 @@ class GRPOScriptArguments(trl.ScriptArguments):
)
code_language: str = field(
default="python",
# '(?:python|cpp)'
metadata={
"help": "Language for code format reward. Based on E2B supported languages https://e2b.dev/docs/code-interpreting/supported-languages",
"choices": ["python", "javascript", "r", "java", "bash", "cpp"],
@ -170,6 +279,10 @@ class GRPOScriptArguments(trl.ScriptArguments):
"help": "for each generation, evaluate these many test cases in parallel, then check if any of them failed (0 score): if so stop evaluating; otherwise continue with the next batch of test cases. Useful to avoid overloading the eval server + save time on wrong solutions"
},
)
code_eval_scoring_mode: Literal["pass_fail", "partial", "weighted_sum"] = field(
default="weighted_sum",
metadata={"help": "use fraction of passed test cases as reward. If false, use 0/1 scoring."},
)
parallel_code_exec_per_proc: int = field(
default=2,
metadata={

View file

@ -18,13 +18,12 @@ import sys
import datasets
import transformers
from datasets import load_dataset
from transformers import set_seed
from transformers.trainer_utils import get_last_checkpoint
from open_r1.configs import GRPOConfig, GRPOScriptArguments
from open_r1.rewards import get_reward_funcs
from open_r1.utils import get_model, get_tokenizer
from open_r1.utils import get_dataset, get_model, get_tokenizer
from open_r1.utils.callbacks import get_callbacks
from open_r1.utils.wandb_logging import init_wandb_training
from trl import GRPOTrainer, ModelConfig, TrlParser, get_peft_config
@ -72,7 +71,7 @@ def main(script_args, training_args, model_args):
init_wandb_training(training_args)
# Load the dataset
dataset = load_dataset(script_args.dataset_name, name=script_args.dataset_config)
dataset = get_dataset(script_args)
################
# Load tokenizer
@ -141,6 +140,9 @@ def main(script_args, training_args, model_args):
# Save model and create model card
##################################
logger.info("*** Save model ***")
# Align the model's generation config with the tokenizer's eos token
# to avoid unbounded generation in the transformers `pipeline()` function
trainer.model.generation_config.eos_token_id = tokenizer.eos_token_id
trainer.save_model(training_args.output_dir)
logger.info(f"Model saved to {training_args.output_dir}")

View file

@ -20,19 +20,21 @@ import json
import math
import re
from functools import partial, update_wrapper
from typing import Callable, Dict, Optional
from typing import Callable, Dict, Literal, Optional
from latex2sympy2_extended import NormalizationConfig
from math_verify import LatexExtractionConfig, parse, verify
from .utils.code_providers import get_provider
from .utils.ioi import (
from .utils.competitive_programming import (
SubtaskResult,
add_includes,
get_morph_client_from_env,
get_piston_client_from_env,
score_subtask,
)
from .utils.competitive_programming import patch_code as cf_patch_code
from .utils.competitive_programming import score_submission as cf_score_submission
from .utils.competitive_programming import score_subtask
def accuracy_reward(completions: list[list[dict[str, str]]], solution: list[str], **kwargs) -> list[Optional[float]]:
@ -130,7 +132,7 @@ def reasoning_steps_reward(completions, **kwargs):
def len_reward(completions: list[Dict[str, str]], solution: list[str], **kwargs) -> float:
"""Compute length-based rewards to discourage overthinking and promote token efficiency.
Taken from the Kimi 1.5 tech report: https://arxiv.org/abs/2501.12599
Taken from the Kimi 1.5 tech report: https://huggingface.co/papers/2501.12599
Args:
completions: List of model completions
@ -282,7 +284,7 @@ def get_cosine_scaled_reward(
def get_repetition_penalty_reward(ngram_size: int, max_penalty: float, language: str = "en"):
"""
Computes N-gram repetition penalty as described in Appendix C.2 of https://arxiv.org/abs/2502.03373.
Computes N-gram repetition penalty as described in Appendix C.2 of https://huggingface.co/papers/2502.03373.
Reference implementation from: https://github.com/eddycmu/demystify-long-cot/blob/release/openrlhf/openrlhf/reward/repetition.py
Args:
@ -415,7 +417,65 @@ def ioi_code_reward(completions, test_batch_size: int = 1, provider_type: str =
return [result.score for result in results]
def extract_code(completion: str, language: str = "python") -> str:
def cf_code_reward(
completions,
test_batch_size: int = 1,
patch_code: bool = False,
scoring_mode: Literal["pass_fail", "partial", "weighted_sum"] = "weighted_sum",
**kwargs,
) -> list[float]:
"""Reward function that evaluates Codeforces problems using Piston+our CF package.
Assumes the dataset has the same format as hf.co/datasets/open-r1/codeforces (verifiable-prompts subset)
test_batch_size: evaluate these many test cases in parallel, then check if any of them failed (0 score): if so stop evaluating; otherwise continue with the next batch of test cases.
"""
# for info on setting up piston workers, see slurm/piston/README.md
piston_client = get_piston_client_from_env()
languages = kwargs["language"] if "language" in kwargs else [None] * len(completions)
code_snippets = [
# note: grading is automatically skipped if a problem has no tests
cf_patch_code(extract_code(completion[-1]["content"], language), language)
if patch_code
else extract_code(completion[-1]["content"], language)
for completion, language in zip(completions, languages)
]
async def run_catch_exceptions(task):
try:
return await task
except Exception as e:
print(f"Error from Piston worker: {e}")
return None
# load problem data. undo separating kwargs by column
problems_data = [dict(zip(kwargs.keys(), values)) for values in zip(*kwargs.values())]
loop = _init_event_loop()
evals = [
loop.create_task(
run_catch_exceptions(
cf_score_submission(
piston_client,
problem_data,
code,
test_batch_size=test_batch_size,
scoring_mode=scoring_mode,
submission_language=problem_data.get("language", None),
)
)
)
for problem_data, code in zip(problems_data, code_snippets)
]
results = loop.run_until_complete(asyncio.gather(*evals))
return results
def extract_code(completion: str, language: str | None = "python") -> str:
if language is None:
return ""
pattern = re.compile(rf"```{language}\n(.*?)```", re.DOTALL)
matches = pattern.findall(completion)
extracted_answer = matches[-1] if len(matches) >= 1 else ""
@ -538,11 +598,20 @@ def get_code_format_reward(language: str = "python"):
Args:
language: Programming language supported by E2B https://e2b.dev/docs/code-interpreting/supported-languages
"""
pattern = rf"^<think>\n.*?\n</think>\n<answer>\n.*?```{language}.*?```.*?\n</answer>$"
def code_format_reward(completions, **kwargs):
# if there is a language field, use it instead of the default language. This way we can have mixed language training.
languages = kwargs["language"] if "language" in kwargs else [language] * len(completions)
completion_contents = [completion[0]["content"] for completion in completions]
matches = [re.match(pattern, content, re.DOTALL | re.MULTILINE) for content in completion_contents]
matches = [
re.match(
rf"^<think>\n.*?\n</think>\n<answer>\n.*?```{sample_language}.*?```.*?\n</answer>$",
content,
re.DOTALL | re.MULTILINE,
)
for content, sample_language in zip(completion_contents, languages)
]
return [1.0 if match else 0.0 for match in matches]
return code_format_reward
@ -617,6 +686,14 @@ def get_reward_funcs(script_args) -> list[Callable]:
),
ioi_code_reward,
),
"cf_code": update_wrapper(
partial(
cf_code_reward,
test_batch_size=script_args.code_eval_test_batch_size,
scoring_mode=script_args.code_eval_scoring_mode,
),
cf_code_reward,
),
"code_format": get_code_format_reward(language=script_args.code_language),
"tag_count": tag_count_reward,
"soft_overlong_punishment": get_soft_overlong_punishment(

View file

@ -19,20 +19,18 @@ Usage:
# One 1 node of 8 x H100s
accelerate launch --config_file=recipes/accelerate_configs/zero3.yaml src/open_r1/sft.py \
--model_name_or_path Qwen/Qwen2.5-1.5B-Instruct \
--dataset_name open-r1/OpenR1-Math-220k \
--learning_rate 2.0e-5 \
--num_train_epochs 1 \
--packing \
--max_seq_length 4096 \
--model_name_or_path open-r1/Qwen2.5-Math-7B-RoPE-300k \
--dataset_name open-r1/Mixture-of-Thoughts \
--dataset_config all \
--eos_token '<|im_end|>' \
--learning_rate 4.0e-5 \
--num_train_epochs 5 \
--max_seq_length 32768 \
--per_device_train_batch_size 2 \
--gradient_accumulation_steps 8 \
--gradient_checkpointing \
--bf16 \
--logging_steps 5 \
--eval_strategy steps \
--eval_steps 100 \
--output_dir data/Qwen2.5-1.5B-Open-R1-Distill
--use_liger_kernel \
--output_dir data/OpenR1-Distill-7B
"""
import logging
@ -41,22 +39,20 @@ import sys
import datasets
import transformers
from datasets import load_dataset
from transformers import set_seed
from transformers.trainer_utils import get_last_checkpoint
from open_r1.configs import SFTConfig
from open_r1.utils import get_model, get_tokenizer
from open_r1.configs import ScriptArguments, SFTConfig
from open_r1.utils import get_dataset, get_model, get_tokenizer
from open_r1.utils.callbacks import get_callbacks
from open_r1.utils.wandb_logging import init_wandb_training
from trl import ModelConfig, ScriptArguments, SFTTrainer, TrlParser, get_peft_config, setup_chat_format
from trl import ModelConfig, SFTTrainer, TrlParser, get_peft_config, setup_chat_format
logger = logging.getLogger(__name__)
def main(script_args, training_args, model_args):
# Set seed for reproducibility
set_seed(training_args.seed)
###############
@ -88,24 +84,15 @@ def main(script_args, training_args, model_args):
if "wandb" in training_args.report_to:
init_wandb_training(training_args)
################
# Load datasets
################
dataset = load_dataset(script_args.dataset_name, name=script_args.dataset_config)
################
# Load tokenizer
################
######################################
# Load dataset, tokenizer, and model #
######################################
dataset = get_dataset(script_args)
tokenizer = get_tokenizer(model_args, training_args)
###################
# Load model
###################
logger.info("*** Loading model ***")
model = get_model(model_args, training_args)
if tokenizer.chat_template is None:
logger.info("No chat template provided, using ChatML.")
logger.info("No chat template provided, defaulting to ChatML.")
model, tokenizer = setup_chat_format(model, tokenizer, format="chatml")
############################
@ -141,6 +128,9 @@ def main(script_args, training_args, model_args):
# Save model and create model card
##################################
logger.info("*** Save model ***")
# Align the model's generation config with the tokenizer's eos token
# to avoid unbounded generation in the transformers `pipeline()` function
trainer.model.generation_config.eos_token_id = tokenizer.eos_token_id
trainer.save_model(training_args.output_dir)
logger.info(f"Model saved to {training_args.output_dir}")

View file

@ -1,5 +1,6 @@
from .data import get_dataset
from .import_utils import is_e2b_available, is_morph_available
from .model_utils import get_model, get_tokenizer
__all__ = ["get_tokenizer", "is_e2b_available", "is_morph_available", "get_model"]
__all__ = ["get_tokenizer", "is_e2b_available", "is_morph_available", "get_model", "get_dataset"]

View file

@ -1,13 +1,17 @@
from .cf_scoring import score_submission
from .code_patcher import patch_code
from .ioi_scoring import SubtaskResult, score_subtask, score_subtasks
from .ioi_utils import add_includes
from .morph_client import get_morph_client_from_env
from .piston_client import get_piston_client_from_env, get_slurm_piston_endpoints
from .scoring import SubtaskResult, score_subtask, score_subtasks
from .utils import add_includes
__all__ = [
"get_piston_client_from_env",
"get_slurm_piston_endpoints",
"get_morph_client_from_env",
"patch_code",
"score_submission",
"score_subtask",
"score_subtasks",
"add_includes",

View file

@ -0,0 +1,146 @@
import asyncio
import os
from io import BytesIO
from typing import Literal
from async_lru import alru_cache
from .piston_client import PistonClient
from .utils import batched
async def score_single_test_case(
client: PistonClient,
problem_data: dict,
test_input: str,
test_output: str,
submission: str,
submission_language: str = "cpp",
) -> tuple[str, str]:
if submission_language not in ["python", "cpp"]:
raise ValueError(f"Invalid submission language: {submission_language}")
try:
result = await client.send_execute(
{
"files": [
{"name": f"main.{submission_language}", "content": submission},
*(
[{"name": "checker.py", "content": problem_data["generated_checker"]}]
if problem_data["generated_checker"]
else []
),
{"name": "input.txt", "content": test_input},
{"name": "correct_output.txt", "content": test_output},
{
"name": "grader_config",
"content": "\n".join(
f"{key}={value}"
for key, value in {
"TIME_LIMIT": problem_data["time_limit"],
"MEMORY_LIMIT": problem_data["memory_limit"],
"INPUT_MODE": problem_data["input_mode"],
}.items()
),
},
],
"run_timeout": (problem_data["time_limit"] + 10) * 1000,
# +10 seconds hard limit. time limits are handled by the codeforces script
},
language="cf_python3" if submission_language == "python" else "c++17",
)
except Exception as e:
print(f"Error scoring submission: {e}")
return False
return result
@alru_cache(maxsize=32) # TODO make this configurable
async def get_generated_contest_tests(contest_id: str) -> list[dict]:
import pandas as pd
import aiofiles
import aiofiles.os
tests_folder = os.environ.get("CF_TESTS_FOLDER", None)
if not tests_folder:
raise ValueError(
"CF_TESTS_FOLDER environment variable not set! Please download the codeforces generated tests and set CF_TESTS_FOLDER to the folder path. See https://huggingface.co/datasets/open-r1/codeforces for more information."
)
if not await aiofiles.os.path.exists(tests_folder):
raise ValueError(
f"CF_TESTS_FOLDER path '{tests_folder}' does not exist! Please download the codeforces generated tests and set CF_TESTS_FOLDER to the folder path. See https://huggingface.co/datasets/open-r1/codeforces for more information."
)
parquet_path = os.path.join(tests_folder, f"test_cases_{int(contest_id):04d}.parquet")
if not await aiofiles.os.path.exists(parquet_path):
return {}
# Read parquet file asynchronously
async with aiofiles.open(parquet_path, "rb") as f:
content = await f.read()
df = pd.read_parquet(BytesIO(content))
# Group by problem_id and convert to dictionary of lists
grouped_tests = df.groupby("problem_id").apply(lambda x: x[["input", "output"]].to_dict("records")).to_dict()
return grouped_tests
async def get_generated_tests(problem_id: str) -> list[dict]:
contest_id = problem_id.split("/")[0]
return (await get_generated_contest_tests(contest_id)).get(problem_id, [])
async def score_submission(
client: PistonClient,
problem_data: dict,
submission: str,
test_batch_size: int = 1,
scoring_mode: Literal["pass_fail", "partial", "weighted_sum"] = "weighted_sum",
no_compile_reward: float = -0.1,
no_submission_reward: float = -1.0,
submission_language: str = "cpp",
) -> float:
if submission_language not in ["python", "cpp"]:
raise ValueError(f"Invalid submission language: {submission_language}")
test_cases = problem_data["official_tests"] + (await get_generated_tests(problem_data["id"]))
# invalid/not a coding problem
if test_cases is None or len(test_cases) == 0:
return None
# no code extracted
if not submission:
return no_submission_reward
passed_test_cases = 0
# run one batch, check if any of them failed (0 score): if so stop evaluating (assuming non partial score); otherwise continue with the next batch of test cases.
for test_batch_to_run in batched(test_cases, test_batch_size) if test_batch_size >= 1 else [test_cases]:
results = await asyncio.gather(
*[
asyncio.create_task(
score_single_test_case(
client, problem_data, test_case["input"], test_case["output"], submission, submission_language
)
)
for test_case in test_batch_to_run
]
)
if any(result and result["compile"]["code"] != 0 for result in results):
return no_compile_reward
tests_passed_results = [
result and result["run"]["code"] == 0 and result["run"]["stdout"].strip() == "1" for result in results
]
if scoring_mode == "pass_fail" and any(not test_passed for test_passed in tests_passed_results):
break
passed_test_cases += sum(1 for test_passed in tests_passed_results if test_passed)
pass_fail_score = 1.0 if passed_test_cases == len(test_cases) else 0.0
if scoring_mode == "pass_fail":
return pass_fail_score
elif scoring_mode == "partial":
return passed_test_cases / len(test_cases)
elif scoring_mode == "weighted_sum":
return pass_fail_score + 0.1 * (passed_test_cases / len(test_cases))
else:
raise ValueError(f"Invalid scoring mode: {scoring_mode}")

View file

@ -0,0 +1,123 @@
import re
def fix_python3_imports(source_code):
"""
Fix common import and function changes between Python 3 versions
Args:
source_code (str): The Python source code to update
Returns:
str: The updated source code
"""
# Dictionary of patterns to replacements
replacements = [
# Fix collections.abc imports (changed in Python 3.3+)
(
r"from collections import (Mapping|Sequence|Set|Container|MutableMapping|MutableSet|MutableSequence)",
r"from collections.abc import \1",
),
# Fix imp module deprecation (deprecated in 3.4)
(r"import imp", r"import importlib"),
# Fix asyncio.async() to asyncio.ensure_future() (renamed in 3.4.4)
(r"asyncio\.async\(", r"asyncio.ensure_future("),
# Fix inspect.getargspec to inspect.getfullargspec (deprecated in 3.5)
(r"inspect\.getargspec", r"inspect.getfullargspec"),
# Fix array.array 'c' type code to 'b' (removed in 3.9)
(r"array\.array\('c'", r"array.array('b'"),
# Fix backslash line continuation with multiple newlines (Python-specific issue)
(r"\\(\r\n|\r|\n)+", "\\\n"),
# some solutions use getlogin() to check if they are debugging or on an actual submission
(r"(?:os\s*\.\s*)?getlogin\s*\(\s*\)", "False"),
# Fix usage of fractions.gcd (moved to math in 3.5)
# 1. Fix direct usage: fractions.gcd -> math.gcd
(r"\bfractions\.gcd\b", r"math.gcd"),
# 2. Fix 'from fractions import gcd, X' -> 'from fractions import X' (start/middle)
(r"(from\s+fractions\s+import\s+(?:\([^)]*)?)\bgcd\s*,\s*", r"\1"),
# 3. Fix 'from fractions import X, gcd' -> 'from fractions import X' (end)
(r"(from\s+fractions\s+import\s+.*?\S)\s*,\s*\bgcd(\s*\)?\s*(?:#.*)?)", r"\1\2"),
# 4. Fix standalone 'from fractions import gcd' -> 'from math import gcd'
(r"from\s+fractions\s+import\s+\(?\s*gcd\s*\)?", r""),
# --- End: Replacement for the faulty line ---
]
lines = source_code.splitlines()
last_import = max(
[
i
for i, line in enumerate(lines)
if line.strip().startswith("import") or (line.strip().startswith("from") and "import" in line)
],
default=0,
)
import_section = "\n".join(lines[: last_import + 1])
main_source = "\n".join(lines[last_import:])
if "fractions.gcd" in source_code and "import math" not in source_code:
import_section += "\nimport math"
elif "gcd" in source_code and "from math import gcd" not in source_code:
import_section += "\nfrom math import gcd"
if "set_int_max_str_digits" not in source_code:
import_section += "\nimport sys\nsys.set_int_max_str_digits(0)"
source_code = import_section + "\n" + main_source
# Apply each replacement
for pattern, replacement in replacements:
source_code = re.sub(pattern, replacement, source_code)
source_code = source_code.rstrip("\\")
return source_code
def fix_cpp_includes(source_code):
# has most of the useful functions
code_header = "#include <bits/stdc++.h>\n"
# use namespace std since models forget std:: often
if "using namespace std;" not in source_code and "std::" not in source_code:
code_header += "\nusing namespace std;\n\n"
return code_header + source_code
def is_patchable(lang):
return lang in ("python", "python3", "Python 3", "PyPy 3", "PyPy 3-64", "cpp") or "C++" in lang
def patch_code(text, lang):
if not text:
return text
if lang in ("python", "python3", "Python 3", "PyPy 3", "PyPy 3-64"):
return fix_python3_imports(text)
elif "cpp" in lang or "C++" in lang:
return fix_cpp_includes(text)
return text
tests = [
"""read = lambda: map(int, input().split())
n, m, z = read()
from fractions import gcd
ans = z // (n * m // gcd(n, m))
print(ans)""",
"""from fractions import Fraction,gcd
a,b,c,d = [int(x) for x in input().split()]
if a*d > b*c:
num = a*d-b*c
denom = a*d
else:
num = b*c-a*d
denom = b*c
div = gcd(num,denom)
print('%d/%d'%(num//div,denom//div))""",
]
if __name__ == "__main__":
for test in tests:
print("ORIGINAL:", test, sep="\n\n")
print("PATCHED:", patch_code(test, "Python 3"), sep="\n\n")
print("=" * 50)

View file

@ -2,8 +2,9 @@ import asyncio
from dataclasses import asdict, dataclass, field
from typing import Union
from .piston_client import PistonClient
from .utils import batched, load_ioi_tests
from .ioi_utils import load_ioi_tests
from .piston_client import PistonClient, PistonError
from .utils import batched
@dataclass
@ -54,16 +55,7 @@ class SubtaskResult:
Returns:
str: The status with the highest priority (lowest value)
"""
status_prios = {
"CE": -1,
"RE": 0,
"WA": 1,
"MLE": 2,
"TLE": 3,
"PA": 4,
"AC": 5,
"SKIPPED": 999,
}
status_prios = {"CE": -1, "RE": 0, "WA": 1, "MLE": 2, "TLE": 3, "PA": 4, "AC": 5, "SKIPPED": 999}
return min([x.status for x in self.test_results], key=lambda x: status_prios[x])
@property
@ -77,10 +69,7 @@ class SubtaskResult:
return (
0
if not self.test_results
else round(
min([test_result.score for test_result in self.test_results]),
self.score_precision,
)
else round(min([test_result.score for test_result in self.test_results]), self.score_precision)
)
@property
@ -95,8 +84,7 @@ class SubtaskResult:
0
if not self.test_results
else round(
min([test_result.score for test_result in self.test_results]) * self.points,
self.score_precision,
min([test_result.score for test_result in self.test_results]) * self.points, self.score_precision
)
)
@ -148,12 +136,7 @@ def _extract_single_status(score: float, feedback: str) -> str:
async def score_single_test_case(
client: PistonClient,
subtask: dict,
test_name: str,
test_input: str,
test_output: str,
submission: str,
client: PistonClient, subtask: dict, test_name: str, test_input: str, test_output: str, submission: str
) -> TestResult:
"""
Scores a single test case by running the submission against the provided input and output.
@ -174,10 +157,7 @@ async def score_single_test_case(
score = float(score)
return TestResult(
test_name=test_name,
score=score,
status=_extract_single_status(score, feedback),
feedback=feedback,
test_name=test_name, score=score, status=_extract_single_status(score, feedback), feedback=feedback
)
@ -219,11 +199,9 @@ async def score_subtask(
# initialize test results with cached results or empty (SKIPPED) TestResult objects
subtask_result.test_results = [
(
test_case_run_cache[test_name]
if test_case_run_cache is not None and test_name in test_case_run_cache
else TestResult(test_name=test_name)
)
test_case_run_cache[test_name]
if test_case_run_cache is not None and test_name in test_case_run_cache
else TestResult(test_name=test_name)
for test_name in subtask["test_names"]
]
@ -247,12 +225,7 @@ async def score_subtask(
*[
asyncio.create_task(
score_single_test_case(
client,
subtask,
test_name,
test_cases[test_name][0],
test_cases[test_name][1],
submission,
client, subtask, test_name, test_cases[test_name][0], test_cases[test_name][1], submission
)
)
for _, test_name in test_batch_to_run
@ -292,11 +265,7 @@ async def score_subtasks(
async def run_submission(
client: PistonClient,
problem: dict,
test_input: str,
submission: str,
test_output: str | None = None,
client: PistonClient, problem: dict, test_input: str, submission: str, test_output: str | None = None
) -> tuple[str, str]:
"""
Executes a submission against a test case using the Piston execution environment.
@ -327,4 +296,40 @@ async def run_submission(
), # +3 seconds hard limit. time limits are handled by the ioi script
"run_memory_limit": problem["memory_limit"],
}
return await client.execute(data)
return await execute_ioi(client, data)
async def execute_ioi(client, data) -> tuple[str, str]:
"""
Requests to the IOI package return the score as a float in the stdout, as well as optional feedback/errors in stderr.
Returns a tuple of (score, feedback).
"""
response = await client.send_execute(data)
if "message" in response:
raise PistonError(response["message"])
if "compile" in response and response["compile"]["code"] != 0:
return "0", "Compilation error exit code " + str(response["compile"]["code"]) + "\n" + response["compile"][
"stderr"
]
if "run" not in response:
raise PistonError(response)
if response["run"]["code"] == 1 and "MemoryError" in response["run"]["stderr"]:
return "0", "Memory limit exceeded"
# successful result
if response["run"]["stdout"]:
return response["run"]["stdout"], response["run"]["stderr"]
if response["run"]["signal"] == "SIGKILL":
return "0", "Time limit exceeded"
# other issues
if response["run"]["code"] != 0:
raise PistonError(
f"language={response['language']}, version={response['version']}, exit code={response['run']['code']}, stderr={response['run']['stderr']}, signal={response['run']['signal']}"
)
return "0", "Unknown error"

View file

@ -1,6 +1,5 @@
from collections import defaultdict
from functools import lru_cache
from itertools import islice
from datasets import load_dataset
@ -31,10 +30,7 @@ def load_ioi_tests_for_year(year: int) -> dict[str, dict[str, tuple[str, str]]]:
tests_dataset = load_dataset("open-r1/ioi-test-cases", name=f"{year}", split="train")
test_cases = defaultdict(dict)
for test_case in tests_dataset:
test_cases[test_case["problem_id"]][test_case["test_name"]] = (
test_case["test_input"],
test_case["test_output"],
)
test_cases[test_case["problem_id"]][test_case["test_name"]] = test_case["test_input"], test_case["test_output"]
return test_cases
@ -43,13 +39,3 @@ def load_ioi_tests(year: int, problem_id: str) -> dict[str, tuple[str, str]]:
Load IOI tests for a given year and problem id.
"""
return load_ioi_tests_for_year(year)[problem_id]
def batched(iterable, n):
"Batch data into lists of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
return iterable
it = iter(iterable)
while batch := list(islice(it, n)):
yield batch

View file

@ -14,16 +14,23 @@ class PistonError(Exception):
@lru_cache(maxsize=1)
def get_piston_client_from_env():
def get_piston_client_from_env(session=None):
piston_endpoints = os.getenv("PISTON_ENDPOINTS")
if piston_endpoints is None:
raise ValueError(
"For IOI problems Piston endpoints running our IOI package are required. Please add a list of valid Piston endpoints to a PISTON_ENDPOINTS varialbe in a `.env` file."
"For IOI/CF problems Piston endpoints running our IOI package are required. Please add a list of valid Piston endpoints to a PISTON_ENDPOINTS variable in a `.env` file."
)
piston_endpoints = piston_endpoints.split(",") if piston_endpoints != "slurm" else get_slurm_piston_endpoints()
piston_endpoints = sorted(
piston_endpoints.split(",") if piston_endpoints != "slurm" else get_slurm_piston_endpoints()
)
gpu_nb = int(os.getenv("LOCAL_RANK", 0)) # perGPU index
world = int(os.getenv("WORLD_SIZE", 1)) # total GPUs
if world > 1:
print(f"Using a subset of piston endpoints for GPU#{gpu_nb}")
piston_endpoints = piston_endpoints[gpu_nb::world]
random.shuffle(piston_endpoints)
max_requests_per_endpoint = os.getenv("PISTON_MAX_REQUESTS_PER_ENDPOINT", "1")
return PistonClient(piston_endpoints, max_requests_per_endpoint=int(max_requests_per_endpoint))
return PistonClient(piston_endpoints, session, max_requests_per_endpoint=int(max_requests_per_endpoint))
class PistonClient:
@ -57,6 +64,8 @@ class PistonClient:
):
self.max_requests_per_endpoint = max_requests_per_endpoint
self.base_endpoints = [base_endpoint] if isinstance(base_endpoint, str) else base_endpoint
if len(self.base_endpoints) == 0:
raise ValueError("No Piston endpoints provided. Please check your PISTON_ENDPOINTS environment variable.")
self.endpoint_ids = {endpoint: i for i, endpoint in enumerate(self.base_endpoints)}
self._session = session
@ -73,7 +82,7 @@ class PistonClient:
def session(self):
if self._session is None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(sock_read=10),
timeout=aiohttp.ClientTimeout(sock_read=30),
connector=aiohttp.TCPConnector(
limit=self.max_requests_per_endpoint * len(self.base_endpoints),
ttl_dns_cache=300,
@ -91,10 +100,7 @@ class PistonClient:
async def _send_request(self, endpoint, route, data=None, method="post"):
async with self.session.request(
method,
f"{endpoint.rstrip('/')}/{route}",
json=data,
headers={"Content-Type": "application/json"},
method, f"{endpoint.rstrip('/')}/{route}", json=data, headers={"Content-Type": "application/json"}
) as response:
return await response.json(content_type=None)
@ -115,45 +121,6 @@ class PistonClient:
async def get_supported_runtimes(self):
return await self._send_to_all("runtimes", method="get")
async def execute(self, data) -> tuple[str, str]:
"""
Requests to the IOI package return the score as a float in the stdout, as well as optional feedback/errors in stderr.
Returns a tuple of (score, feedback).
"""
response = await self._send_execute(data)
if "message" in response:
raise PistonError(response["message"])
if "compile" in response and response["compile"]["code"] != 0:
return (
"0",
"Compilation error exit code "
+ str(response["compile"]["code"])
+ "\n"
+ response["compile"]["stderr"],
)
if "run" not in response:
raise PistonError(response)
if response["run"]["code"] == 1 and "MemoryError" in response["run"]["stderr"]:
return "0", "Memory limit exceeded"
# successful result
if response["run"]["stdout"]:
return response["run"]["stdout"], response["run"]["stderr"]
if response["run"]["signal"] == "SIGKILL":
return "0", "Time limit exceeded"
# other issues
if response["run"]["code"] != 0:
raise PistonError(
f"language={response['language']}, version={response['version']}, exit code={response['run']['code']}, stderr={response['run']['stderr']}, signal={response['run']['signal']}"
)
return "0", "Unknown error"
async def _check_failed_endpoint(self, endpoint):
async with self._endpoint_failures_lock:
if endpoint in self._unhealthy_endpoints:
@ -164,14 +131,15 @@ class PistonClient:
except Exception as e:
print(f"Error checking endpoint {endpoint}, dropping it ({e})")
self._unhealthy_endpoints.add(endpoint)
if len(self._unhealthy_endpoints) >= len(self.base_endpoints):
raise PistonError("All endpoints are unhealthy. Please check your Piston workers.")
async def _send_execute(self, data):
async def send_execute(self, data, language="cms_ioi", max_retries=5):
data = data | {
"language": "cms_ioi",
"language": language,
"version": "*",
}
max_retries = 5
base_delay = 1.0
status = None
@ -183,15 +151,13 @@ class PistonClient:
if attempt > 0:
await asyncio.sleep(1)
async with self.session.post(
f"{endpoint.rstrip('/')}/execute",
json=data,
headers={"Content-Type": "application/json"},
f"{endpoint.rstrip('/')}/execute", json=data, headers={"Content-Type": "application/json"}
) as response:
status = response.status
res_json = await response.json(content_type=None)
if status != 200:
raise PistonError(f"Server error. status={status}")
raise PistonError(f"Server error. status={status}. {res_json}")
if res_json is None:
raise PistonError(f"Empty response. status={status}")
# piston overloaded
@ -199,19 +165,14 @@ class PistonClient:
raise PistonError(f"Piston overloaded: {res_json['run']['stderr']}")
return res_json
except (
PistonError,
asyncio.TimeoutError,
aiohttp.ClientConnectionError,
RuntimeError,
) as e:
except (PistonError, asyncio.TimeoutError, aiohttp.ClientConnectionError, RuntimeError) as e:
# Only retry if we haven't reached max retries yet
if attempt < max_retries:
# Calculate backoff with jitter
delay = min(base_delay * (2**attempt), 10) # Exponential backoff, capped at 10 seconds
jitter = delay * 0.2 * (2 * asyncio.get_event_loop().time() % 1 - 0.5) # Add ±10% jitter
retry_delay = delay + jitter
print(f"Retrying in {retry_delay} seconds [{self.endpoint_ids[endpoint]}] {endpoint}")
print(f"Retrying in {retry_delay:.2f} seconds [{self.endpoint_ids[endpoint]}] {endpoint} - {e}")
# special case: worker died
if isinstance(e, aiohttp.ClientConnectionError) and "Connect call failed" in str(e):
@ -223,8 +184,7 @@ class PistonClient:
await asyncio.sleep(retry_delay)
else:
print(f"Giving up on retries. {e}")
raise e
await self._check_failed_endpoint(endpoint)
except Exception as e:
print(f"Propagating exception {type(e)}: {e}")
raise e
@ -242,9 +202,7 @@ def get_slurm_piston_endpoints():
"""Get list of active piston worker endpoints from squeue output"""
# Run squeue command to get job name, hostname and status, filtering for RUNNING state
result = subprocess.run(
["squeue", '--format="%j %N %T"', "--noheader", "--states=RUNNING"],
capture_output=True,
text=True,
["squeue", '--format="%j %N %T"', "--noheader", "--states=RUNNING"], capture_output=True, text=True
)
# Split output into lines and skip header

View file

@ -0,0 +1,11 @@
from itertools import islice
def batched(iterable, n):
"Batch data into lists of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
return iterable
it = iter(iterable)
while batch := list(islice(it, n)):
yield batch

65
src/open_r1/utils/data.py Normal file
View file

@ -0,0 +1,65 @@
import logging
import datasets
from datasets import DatasetDict, concatenate_datasets
from ..configs import ScriptArguments
logger = logging.getLogger(__name__)
def get_dataset(args: ScriptArguments) -> DatasetDict:
"""Load a dataset or a mixture of datasets based on the configuration.
Args:
args (ScriptArguments): Script arguments containing dataset configuration.
Returns:
DatasetDict: The loaded datasets.
"""
if args.dataset_name and not args.dataset_mixture:
logger.info(f"Loading dataset: {args.dataset_name}")
return datasets.load_dataset(args.dataset_name, args.dataset_config)
elif args.dataset_mixture:
logger.info(f"Creating dataset mixture with {len(args.dataset_mixture.datasets)} datasets")
seed = args.dataset_mixture.seed
datasets_list = []
for dataset_config in args.dataset_mixture.datasets:
logger.info(f"Loading dataset for mixture: {dataset_config.id} (config: {dataset_config.config})")
ds = datasets.load_dataset(
dataset_config.id,
dataset_config.config,
split=dataset_config.split,
)
if dataset_config.columns is not None:
ds = ds.select_columns(dataset_config.columns)
if dataset_config.weight is not None:
ds = ds.shuffle(seed=seed).select(range(int(len(ds) * dataset_config.weight)))
logger.info(
f"Subsampled dataset '{dataset_config.id}' (config: {dataset_config.config}) with weight={dataset_config.weight} to {len(ds)} examples"
)
datasets_list.append(ds)
if datasets_list:
combined_dataset = concatenate_datasets(datasets_list)
combined_dataset = combined_dataset.shuffle(seed=seed)
logger.info(f"Created dataset mixture with {len(combined_dataset)} examples")
if args.dataset_mixture.test_split_size is not None:
combined_dataset = combined_dataset.train_test_split(
test_size=args.dataset_mixture.test_split_size, seed=seed
)
logger.info(
f"Split dataset into train and test sets with test size: {args.dataset_mixture.test_split_size}"
)
return combined_dataset
else:
return DatasetDict({"train": combined_dataset})
else:
raise ValueError("No datasets were loaded from the mixture configuration")
else:
raise ValueError("Either `dataset_name` or `dataset_mixture` must be provided")

View file

@ -79,7 +79,7 @@ def run_lighteval_job(
if get_param_count_from_repo_id(model_name) >= 30_000_000_000:
tensor_parallel = True
else:
num_gpus = 8
num_gpus = 2 # Hack while cluster is full
tensor_parallel = False
cmd = VLLM_SLURM_PREFIX.copy()

129
tests/utils/test_data.py Normal file
View file

@ -0,0 +1,129 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from dataclasses import asdict
from datasets import DatasetDict, load_dataset
from open_r1.configs import DatasetConfig, DatasetMixtureConfig, ScriptArguments
from open_r1.utils.data import get_dataset
class TestGetDataset(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.dataset_name = "trl-internal-testing/zen"
cls.dataset_config = "conversational_preference"
cls.ref_dataset = load_dataset(cls.dataset_name, cls.dataset_config)
def test_dataset_and_config_name(self):
args = ScriptArguments(dataset_name=self.dataset_name, dataset_config=self.dataset_config)
dataset = get_dataset(args)
self.assertIsInstance(dataset, DatasetDict)
self.assertIn("train", dataset)
self.assertEqual(len(dataset["train"]), len(self.ref_dataset["train"]))
def test_unweighted_mixture(self):
"""Mix train and test splits of the same dataset."""
dataset_configs = [
DatasetConfig(id=self.dataset_name, config=self.dataset_config, split="train", columns=None, weight=None),
DatasetConfig(id=self.dataset_name, config=self.dataset_config, split="test", columns=None, weight=None),
]
dataset_mixture = DatasetMixtureConfig(
datasets=dataset_configs,
)
args = ScriptArguments(dataset_mixture=asdict(dataset_mixture))
dataset = get_dataset(args)
self.assertIsInstance(dataset, DatasetDict)
self.assertIn("train", dataset)
self.assertEqual(len(dataset["train"]), len(self.ref_dataset["train"]) + len(self.ref_dataset["test"]))
def test_weighted_mixture(self):
"""Test loading a dataset mixture with weights."""
dataset_configs = [
DatasetConfig(id=self.dataset_name, config=self.dataset_config, split="train", columns=None, weight=0.25),
DatasetConfig(id=self.dataset_name, config=self.dataset_config, split="test", columns=None, weight=0.5),
]
dataset_mixture = DatasetMixtureConfig(
datasets=dataset_configs,
)
args = ScriptArguments(dataset_mixture=asdict(dataset_mixture))
dataset = get_dataset(args)
self.assertIsInstance(dataset, DatasetDict)
self.assertIn("train", dataset)
self.assertEqual(
len(dataset["train"]), len(self.ref_dataset["train"]) // 4 + len(self.ref_dataset["test"]) // 2
)
def test_mixture_and_test_split(self):
"""Test loading a dataset mixture with test split."""
dataset_configs = [
DatasetConfig(
id=self.dataset_name, config=self.dataset_config, split="train[:10]", columns=None, weight=None
),
]
dataset_mixture = DatasetMixtureConfig(datasets=dataset_configs, test_split_size=0.2)
args = ScriptArguments(dataset_name=None, dataset_mixture=asdict(dataset_mixture))
dataset = get_dataset(args)
self.assertIsInstance(dataset, DatasetDict)
self.assertIn("train", dataset)
self.assertIn("test", dataset)
self.assertEqual(len(dataset["train"]), 8)
self.assertEqual(len(dataset["test"]), 2)
def test_mixture_column_selection(self):
"""Test loading a dataset mixture with column selection."""
dataset_configs = [
DatasetConfig(
id=self.dataset_name,
config=self.dataset_config,
split="train",
columns=["prompt", "chosen"],
weight=None,
),
]
dataset_mixture = DatasetMixtureConfig(
datasets=dataset_configs,
)
args = ScriptArguments(dataset_mixture=asdict(dataset_mixture))
dataset = get_dataset(args)
self.assertIsInstance(dataset, DatasetDict)
self.assertIn("train", dataset)
self.assertIn("prompt", dataset["train"].column_names)
self.assertIn("chosen", dataset["train"].column_names)
def test_mixture_with_mismatched_columns(self):
dataset_configs = [
DatasetConfig(
id=self.dataset_name, config=self.dataset_config, split="train", columns=["prompt"], weight=None
),
DatasetConfig(
id=self.dataset_name, config=self.dataset_config, split="train", columns=["chosen"], weight=None
),
]
dataset_mixture = DatasetMixtureConfig(
datasets=dataset_configs,
)
with self.assertRaises(ValueError) as context:
_ = ScriptArguments(dataset_mixture=asdict(dataset_mixture))
self.assertIn("Column names must be consistent", str(context.exception))
def test_no_dataset_name_or_mixture(self):
with self.assertRaises(ValueError) as context:
_ = ScriptArguments(dataset_name=None, dataset_mixture=None)
self.assertIn("Either `dataset_name` or `dataset_mixture` must be provided", str(context.exception))
if __name__ == "__main__":
unittest.main()