Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Volcengine Verl Dataset To Parquet

From Leeroopedia


Field Value
Knowledge Sources Wrapper Doc (wraps HuggingFace datasets .map() and .to_parquet())
Domains Data Serialization, Parquet Export, HDFS Integration
Last Updated 2026-02-07

Overview

Description

This implementation documents the standard verl pattern for transforming a HuggingFace Dataset into standardized Parquet files for downstream training. The pipeline consists of two steps:

  1. Transform: Apply dataset.map(function=make_map_fn(split), with_indices=True) to convert each raw example into the verl-standard schema with columns: data_source, prompt, ability, reward_model, and extra_info.
  2. Export: Call dataset.to_parquet(path) to serialize the transformed dataset to a Parquet file.

An optional third step copies the output files to HDFS using verl.utils.hdfs_io.copy for distributed storage.

This pattern is used consistently across all verl data preprocessing scripts (gsm8k.py, full_hh_rlhf.py, gsm8k_multiturn_w_tool.py, etc.) and produces the Parquet files consumed by SFTDataset, RLDataset, and other verl dataset classes.

Usage

# Standard two-step pattern:
dataset = dataset.map(function=make_map_fn("train"), with_indices=True)
dataset.to_parquet("~/data/my_dataset/train.parquet")

Code Reference

Attribute Detail
Source Location examples/data_preprocess/gsm8k.py, Lines 82-105 (canonical example)
Signature (map) dataset.map(function=make_map_fn(split), with_indices=True) -> Dataset
Signature (export) dataset.to_parquet(path: str) -> None
Import import datasets

I/O Contract

Inputs

Parameter Type Description
dataset datasets.Dataset HuggingFace Dataset object with raw columns (e.g., question, answer)
make_map_fn(split) Callable Factory function returning a process_fn(example, idx) that transforms each row
with_indices bool When True, passes the row index as the second argument to process_fn
path str Output file path for the Parquet file (e.g., ~/data/gsm8k/train.parquet)

Outputs

Output Type Description
Parquet file File Serialized dataset with standardized columns

Standardized output columns:

Column Type Description
data_source str Identifier for the source dataset (e.g., "openai/gsm8k")
prompt list[dict] Chat-formatted messages (role + content)
ability str Task category (e.g., "math", "alignment")
reward_model dict Reward configuration: style ("rule" or "model") and ground_truth
extra_info dict Additional metadata (split, index, raw fields, tool kwargs, etc.)

Usage Examples

Example 1: GSM8K Parquet export (canonical pattern)

import os
import datasets

dataset = datasets.load_dataset("openai/gsm8k", "main")
train_dataset = dataset["train"]
test_dataset = dataset["test"]

def make_map_fn(split):
    def process_fn(example, idx):
        question = example.pop("question") + " Let's think step by step..."
        answer_raw = example.pop("answer")
        solution = extract_solution(answer_raw)
        return {
            "data_source": "openai/gsm8k",
            "prompt": [{"role": "user", "content": question}],
            "ability": "math",
            "reward_model": {"style": "rule", "ground_truth": solution},
            "extra_info": {"split": split, "index": idx},
        }
    return process_fn

train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)

local_save_dir = os.path.expanduser("~/data/gsm8k")
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_save_dir, "test.parquet"))

Example 2: Optional HDFS copy

from verl.utils.hdfs_io import copy, makedirs

hdfs_dir = "hdfs://my-cluster/data/gsm8k"
local_save_dir = os.path.expanduser("~/data/gsm8k")

# Save locally first
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))

# Then copy to HDFS
if hdfs_dir is not None:
    makedirs(hdfs_dir)
    copy(src=local_save_dir, dst=hdfs_dir)

Example 3: Reading the output Parquet for verification

import pandas as pd

df = pd.read_parquet("~/data/gsm8k/train.parquet")
print(f"Columns: {list(df.columns)}")
# ['data_source', 'prompt', 'ability', 'reward_model', 'extra_info']

print(f"Rows: {len(df)}")
print(f"First prompt: {df['prompt'].iloc[0]}")
print(f"First reward_model: {df['reward_model'].iloc[0]}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment