Implementation:Huggingface Datatrove LocalPipelineExecutor
| Knowledge Sources | |
|---|---|
| Domains | Data Processing, Parallel Computing |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Implements a local pipeline executor that runs datatrove pipelines on a single machine using multiprocessing for parallel task execution.
Description
LocalPipelineExecutor is the primary executor for running datatrove pipelines on a single machine. It inherits from PipelineExecutor and manages the distribution of pipeline tasks across multiple worker processes. The executor supports configurable parallelism through the workers parameter: when set to 1, tasks run sequentially in a loop with deepcopy of the pipeline for each rank; when greater than 1, it spawns a multiprocess.Pool using a configurable start method (default "forkserver") and distributes tasks via imap_unordered.
The executor implements several important features for production use. It supports dependency chaining, where one executor can declare a dependency on another LocalPipelineExecutor that must complete before it begins. It provides skip_completed functionality that checks for previously completed ranks and skips them on restart, enabling fault-tolerant resumable execution. The local_tasks and local_rank_offset parameters allow partitioning of the total task space across multiple machines, even though each machine runs its own local executor.
A Manager.Queue controls local rank assignment, ensuring that workers are properly assigned local rank identifiers for GPU or resource allocation. After all tasks complete, per-task PipelineStats are aggregated into a merged summary and saved to the logging directory.
Usage
Use LocalPipelineExecutor for development, testing, and single-machine production workloads. It is the default and most commonly used executor in datatrove. For multi-node distributed execution, consider the Slurm or Ray executors instead.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/executor/local.py
- Lines: 1-173
Signature
class LocalPipelineExecutor(PipelineExecutor):
def __init__(
self,
pipeline: list[PipelineStep | Callable],
tasks: int = 1,
workers: int = -1,
logging_dir: DataFolderLike = None,
depends: "LocalPipelineExecutor" = None,
skip_completed: bool = True,
start_method: str = "forkserver",
local_tasks: int = -1,
local_rank_offset: int = 0,
randomize_start_duration: int = 0,
): ...
def run(self) -> PipelineStats: ...
@property
def world_size(self) -> int: ...
Import
from datatrove.executor.local import LocalPipelineExecutor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| pipeline | list[PipelineStep or Callable] | Yes | List of pipeline steps or lambda functions to execute |
| tasks | int | No | Total number of tasks to run (default: 1) |
| workers | int | No | Number of parallel workers; -1 means equal to tasks (default: -1) |
| logging_dir | DataFolderLike | No | Directory for logs, stats, and completion markers |
| depends | LocalPipelineExecutor | No | A prior executor that must complete first |
| skip_completed | bool | No | Whether to skip previously completed tasks (default: True) |
| start_method | str | No | Multiprocessing start method (default: "forkserver") |
| local_tasks | int | No | Number of tasks to run on this node; -1 for all (default: -1) |
| local_rank_offset | int | No | Starting rank for this node (default: 0) |
| randomize_start_duration | int | No | Max seconds to randomly delay task starts (default: 0) |
Outputs
| Name | Type | Description |
|---|---|---|
| stats | PipelineStats | Merged statistics from all completed tasks |
| stats.json | file | Statistics file saved to the logging directory |
| completions | files | Per-rank completion marker files in the logging directory |
Usage Examples
Basic Usage
from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.readers.jsonl import JsonlReader
from datatrove.pipeline.filters.language_filter import LanguageFilter
from datatrove.pipeline.writers.jsonl import JsonlWriter
# Single-worker sequential execution
executor = LocalPipelineExecutor(
pipeline=[
JsonlReader("input/"),
LanguageFilter(languages=["en"]),
JsonlWriter("output/"),
],
tasks=4,
workers=1,
logging_dir="logs/my_pipeline",
)
stats = executor.run()
# Multi-worker parallel execution
executor = LocalPipelineExecutor(
pipeline=[
JsonlReader("input/"),
JsonlWriter("output/"),
],
tasks=16,
workers=4,
logging_dir="logs/parallel_run",
)
stats = executor.run()
# Chained executors with dependencies
step1 = LocalPipelineExecutor(pipeline=[...], tasks=8, logging_dir="logs/step1")
step2 = LocalPipelineExecutor(pipeline=[...], tasks=8, logging_dir="logs/step2", depends=step1)
step2.run() # Automatically runs step1 first