Implementation:Huggingface Datatrove RayPipelineExecutor
| Knowledge Sources | |
|---|---|
| Domains | Distributed Computing, Data Processing |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
RayPipelineExecutor distributes datatrove pipeline tasks across a Ray cluster with support for multi-node placement groups, task timeouts, and automatic retry of preempted or crashed workers.
Description
RayPipelineExecutor is a concrete implementation of PipelineExecutor that leverages the Ray framework for distributed, fault-tolerant pipeline execution. It serializes itself into Ray's object store via ray.put, then uses a RayTaskManager to asynchronously create placement groups (one per task group, with nodes_per_task bundles), spawn RankWorker Ray actors on each bundle, discover node IPs, and dispatch run_for_rank calls.
The module contains several supporting classes. RankWorker is a Ray actor that runs pipeline tasks within a local multiprocess.Pool, allowing multiple ranks per actor. TimeoutManager tracks per-task wall-clock time and triggers task group termination on timeout. RayTaskManager orchestrates task groups for multi-node execution, handling asynchronous placement group creation via a ThreadPoolExecutor, coordinating task completion, and managing cleanup of placement groups and actors. It handles Ray-specific retriable errors (preemption, actor crashes, object loss) and returns ranks for resubmission up to a configurable maximum number of retries (default 3).
Logs are written locally on Ray workers to a temporary directory and then uploaded to the shared logging directory after pipeline completion. Statistics are gathered per rank and merged into a single aggregate stats file at the end of execution.
Usage
Use RayPipelineExecutor when running data processing pipelines at scale across a Ray cluster, particularly in cloud environments with spot or preemptible instances where fault tolerance and automatic retries are needed. The executor supports both single-node and multi-node task configurations.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/executor/ray.py
- Lines: 1-650
Signature
class RayPipelineExecutor(PipelineExecutor):
def __init__(
self,
pipeline: list[PipelineStep | Callable],
tasks: int = 1,
workers: int = -1,
depends: "RayPipelineExecutor" = None,
skip_completed: bool = True,
logging_dir: DataFolderLike = None,
randomize_start_duration: int = 0,
cpus_per_task: int = 1,
mem_per_cpu_gb: float = 2,
gpus_per_task: int = 0,
nodes_per_task: int = 1,
ray_remote_kwargs: dict = None,
log_first: bool = False,
tasks_per_job: int = 1,
time: Optional[int] = None,
):
Import
from datatrove.executor.ray import RayPipelineExecutor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| pipeline | Callable] | Yes | List of pipeline steps or callable functions to execute |
| tasks | int | No | Total number of tasks to distribute (default: 1) |
| workers | int | No | Maximum concurrent tasks (-1 for no limit, defaults to tasks) |
| depends | RayPipelineExecutor | No | Another executor that must complete before this one starts |
| skip_completed | bool | No | Skip tasks completed in previous runs (default: True) |
| logging_dir | DataFolderLike | No | Directory for logs, stats, and completion markers |
| randomize_start_duration | int | No | Maximum seconds to randomly delay task start (default: 0) |
| cpus_per_task | int | No | CPUs to reserve per task in Ray cluster (default: 1) |
| mem_per_cpu_gb | float | No | Memory in GB per CPU (default: 2) |
| gpus_per_task | int | No | GPUs to reserve per task (default: 0) |
| nodes_per_task | int | No | Nodes per task; if >1, uses placement groups (default: 1) |
| ray_remote_kwargs | dict | No | Additional kwargs passed to ray.remote decorator |
| log_first | bool | No | Whether first task in Ray job logs to console (default: False) |
| tasks_per_job | int | No | Number of tasks to run in each Ray job (default: 1) |
| time | int | No | Optional time limit in seconds for each task |
Outputs
| Name | Type | Description |
|---|---|---|
| stats.json | JSON file | Aggregated PipelineStats for all completed tasks |
| logs/task_XXXXX.log | Log files | Per-task log files uploaded from Ray workers |
| completions | Marker files | Per-rank completion markers for skip_completed support |
Usage Examples
Basic Usage
from datatrove.executor.ray import RayPipelineExecutor
from datatrove.pipeline.readers.jsonl import JsonlReader
from datatrove.pipeline.filters.language_filter import LanguageFilter
executor = RayPipelineExecutor(
pipeline=[
JsonlReader("s3://my-bucket/input/"),
LanguageFilter(languages=["en"]),
],
tasks=100,
workers=20,
logging_dir="s3://my-bucket/logs/run1",
cpus_per_task=2,
mem_per_cpu_gb=4,
)
executor.run()
Multi-Node with GPU
executor = RayPipelineExecutor(
pipeline=[...],
tasks=50,
workers=10,
nodes_per_task=2,
gpus_per_task=1,
cpus_per_task=4,
time=3600,
logging_dir="s3://my-bucket/logs/gpu-run",
)
executor.run()