Implementation:Volcengine Verl Dataset To Parquet
| Field | Value |
|---|---|
| Knowledge Sources | Wrapper Doc (wraps HuggingFace datasets .map() and .to_parquet())
|
| Domains | Data Serialization, Parquet Export, HDFS Integration |
| Last Updated | 2026-02-07 |
Overview
Description
This implementation documents the standard verl pattern for transforming a HuggingFace Dataset into standardized Parquet files for downstream training. The pipeline consists of two steps:
- Transform: Apply
dataset.map(function=make_map_fn(split), with_indices=True)to convert each raw example into the verl-standard schema with columns:data_source,prompt,ability,reward_model, andextra_info. - Export: Call
dataset.to_parquet(path)to serialize the transformed dataset to a Parquet file.
An optional third step copies the output files to HDFS using verl.utils.hdfs_io.copy for distributed storage.
This pattern is used consistently across all verl data preprocessing scripts (gsm8k.py, full_hh_rlhf.py, gsm8k_multiturn_w_tool.py, etc.) and produces the Parquet files consumed by SFTDataset, RLDataset, and other verl dataset classes.
Usage
# Standard two-step pattern:
dataset = dataset.map(function=make_map_fn("train"), with_indices=True)
dataset.to_parquet("~/data/my_dataset/train.parquet")
Code Reference
| Attribute | Detail |
|---|---|
| Source Location | examples/data_preprocess/gsm8k.py, Lines 82-105 (canonical example)
|
| Signature (map) | dataset.map(function=make_map_fn(split), with_indices=True) -> Dataset
|
| Signature (export) | dataset.to_parquet(path: str) -> None
|
| Import | import datasets
|
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
dataset |
datasets.Dataset |
HuggingFace Dataset object with raw columns (e.g., question, answer)
|
make_map_fn(split) |
Callable |
Factory function returning a process_fn(example, idx) that transforms each row
|
with_indices |
bool |
When True, passes the row index as the second argument to process_fn
|
path |
str |
Output file path for the Parquet file (e.g., ~/data/gsm8k/train.parquet)
|
Outputs
| Output | Type | Description |
|---|---|---|
| Parquet file | File | Serialized dataset with standardized columns |
Standardized output columns:
| Column | Type | Description |
|---|---|---|
data_source |
str |
Identifier for the source dataset (e.g., "openai/gsm8k")
|
prompt |
list[dict] |
Chat-formatted messages (role + content)
|
ability |
str |
Task category (e.g., "math", "alignment")
|
reward_model |
dict |
Reward configuration: style ("rule" or "model") and ground_truth
|
extra_info |
dict |
Additional metadata (split, index, raw fields, tool kwargs, etc.) |
Usage Examples
Example 1: GSM8K Parquet export (canonical pattern)
import os
import datasets
dataset = datasets.load_dataset("openai/gsm8k", "main")
train_dataset = dataset["train"]
test_dataset = dataset["test"]
def make_map_fn(split):
def process_fn(example, idx):
question = example.pop("question") + " Let's think step by step..."
answer_raw = example.pop("answer")
solution = extract_solution(answer_raw)
return {
"data_source": "openai/gsm8k",
"prompt": [{"role": "user", "content": question}],
"ability": "math",
"reward_model": {"style": "rule", "ground_truth": solution},
"extra_info": {"split": split, "index": idx},
}
return process_fn
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)
local_save_dir = os.path.expanduser("~/data/gsm8k")
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_save_dir, "test.parquet"))
Example 2: Optional HDFS copy
from verl.utils.hdfs_io import copy, makedirs
hdfs_dir = "hdfs://my-cluster/data/gsm8k"
local_save_dir = os.path.expanduser("~/data/gsm8k")
# Save locally first
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
# Then copy to HDFS
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_save_dir, dst=hdfs_dir)
Example 3: Reading the output Parquet for verification
import pandas as pd
df = pd.read_parquet("~/data/gsm8k/train.parquet")
print(f"Columns: {list(df.columns)}")
# ['data_source', 'prompt', 'ability', 'reward_model', 'extra_info']
print(f"Rows: {len(df)}")
print(f"First prompt: {df['prompt'].iloc[0]}")
print(f"First reward_model: {df['reward_model'].iloc[0]}")
Related Pages
- Principle:Volcengine_Verl_Parquet_Export
- examples/data_preprocess/gsm8k.py -- Canonical usage of this pattern
- Implementation:Volcengine_Verl_Datasets_Load_Dataset -- Upstream dataset loading
- Implementation:Volcengine_Verl_GSM8K_Data_Preprocessing -- GSM8K preprocessing using this pattern
- Implementation:Volcengine_Verl_HH_RLHF_Data_Preprocessing -- HH-RLHF preprocessing using this pattern
- Implementation:Volcengine_Verl_SFTDataset -- Downstream consumer of Parquet files