Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove RayPipelineExecutor

From Leeroopedia
Knowledge Sources
Domains Distributed Computing, Data Processing
Last Updated 2026-02-14 17:00 GMT

Overview

RayPipelineExecutor distributes datatrove pipeline tasks across a Ray cluster with support for multi-node placement groups, task timeouts, and automatic retry of preempted or crashed workers.

Description

RayPipelineExecutor is a concrete implementation of PipelineExecutor that leverages the Ray framework for distributed, fault-tolerant pipeline execution. It serializes itself into Ray's object store via ray.put, then uses a RayTaskManager to asynchronously create placement groups (one per task group, with nodes_per_task bundles), spawn RankWorker Ray actors on each bundle, discover node IPs, and dispatch run_for_rank calls.

The module contains several supporting classes. RankWorker is a Ray actor that runs pipeline tasks within a local multiprocess.Pool, allowing multiple ranks per actor. TimeoutManager tracks per-task wall-clock time and triggers task group termination on timeout. RayTaskManager orchestrates task groups for multi-node execution, handling asynchronous placement group creation via a ThreadPoolExecutor, coordinating task completion, and managing cleanup of placement groups and actors. It handles Ray-specific retriable errors (preemption, actor crashes, object loss) and returns ranks for resubmission up to a configurable maximum number of retries (default 3).

Logs are written locally on Ray workers to a temporary directory and then uploaded to the shared logging directory after pipeline completion. Statistics are gathered per rank and merged into a single aggregate stats file at the end of execution.

Usage

Use RayPipelineExecutor when running data processing pipelines at scale across a Ray cluster, particularly in cloud environments with spot or preemptible instances where fault tolerance and automatic retries are needed. The executor supports both single-node and multi-node task configurations.

Code Reference

Source Location

Signature

class RayPipelineExecutor(PipelineExecutor):
    def __init__(
        self,
        pipeline: list[PipelineStep | Callable],
        tasks: int = 1,
        workers: int = -1,
        depends: "RayPipelineExecutor" = None,
        skip_completed: bool = True,
        logging_dir: DataFolderLike = None,
        randomize_start_duration: int = 0,
        cpus_per_task: int = 1,
        mem_per_cpu_gb: float = 2,
        gpus_per_task: int = 0,
        nodes_per_task: int = 1,
        ray_remote_kwargs: dict = None,
        log_first: bool = False,
        tasks_per_job: int = 1,
        time: Optional[int] = None,
    ):

Import

from datatrove.executor.ray import RayPipelineExecutor

I/O Contract

Inputs

Name Type Required Description
pipeline Callable] Yes List of pipeline steps or callable functions to execute
tasks int No Total number of tasks to distribute (default: 1)
workers int No Maximum concurrent tasks (-1 for no limit, defaults to tasks)
depends RayPipelineExecutor No Another executor that must complete before this one starts
skip_completed bool No Skip tasks completed in previous runs (default: True)
logging_dir DataFolderLike No Directory for logs, stats, and completion markers
randomize_start_duration int No Maximum seconds to randomly delay task start (default: 0)
cpus_per_task int No CPUs to reserve per task in Ray cluster (default: 1)
mem_per_cpu_gb float No Memory in GB per CPU (default: 2)
gpus_per_task int No GPUs to reserve per task (default: 0)
nodes_per_task int No Nodes per task; if >1, uses placement groups (default: 1)
ray_remote_kwargs dict No Additional kwargs passed to ray.remote decorator
log_first bool No Whether first task in Ray job logs to console (default: False)
tasks_per_job int No Number of tasks to run in each Ray job (default: 1)
time int No Optional time limit in seconds for each task

Outputs

Name Type Description
stats.json JSON file Aggregated PipelineStats for all completed tasks
logs/task_XXXXX.log Log files Per-task log files uploaded from Ray workers
completions Marker files Per-rank completion markers for skip_completed support

Usage Examples

Basic Usage

from datatrove.executor.ray import RayPipelineExecutor
from datatrove.pipeline.readers.jsonl import JsonlReader
from datatrove.pipeline.filters.language_filter import LanguageFilter

executor = RayPipelineExecutor(
    pipeline=[
        JsonlReader("s3://my-bucket/input/"),
        LanguageFilter(languages=["en"]),
    ],
    tasks=100,
    workers=20,
    logging_dir="s3://my-bucket/logs/run1",
    cpus_per_task=2,
    mem_per_cpu_gb=4,
)
executor.run()

Multi-Node with GPU

executor = RayPipelineExecutor(
    pipeline=[...],
    tasks=50,
    workers=10,
    nodes_per_task=2,
    gpus_per_task=1,
    cpus_per_task=4,
    time=3600,
    logging_dir="s3://my-bucket/logs/gpu-run",
)
executor.run()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment