Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove LocalPipelineExecutor

From Leeroopedia
Knowledge Sources
Domains Data Processing, Parallel Computing
Last Updated 2026-02-14 17:00 GMT

Overview

Implements a local pipeline executor that runs datatrove pipelines on a single machine using multiprocessing for parallel task execution.

Description

LocalPipelineExecutor is the primary executor for running datatrove pipelines on a single machine. It inherits from PipelineExecutor and manages the distribution of pipeline tasks across multiple worker processes. The executor supports configurable parallelism through the workers parameter: when set to 1, tasks run sequentially in a loop with deepcopy of the pipeline for each rank; when greater than 1, it spawns a multiprocess.Pool using a configurable start method (default "forkserver") and distributes tasks via imap_unordered.

The executor implements several important features for production use. It supports dependency chaining, where one executor can declare a dependency on another LocalPipelineExecutor that must complete before it begins. It provides skip_completed functionality that checks for previously completed ranks and skips them on restart, enabling fault-tolerant resumable execution. The local_tasks and local_rank_offset parameters allow partitioning of the total task space across multiple machines, even though each machine runs its own local executor.

A Manager.Queue controls local rank assignment, ensuring that workers are properly assigned local rank identifiers for GPU or resource allocation. After all tasks complete, per-task PipelineStats are aggregated into a merged summary and saved to the logging directory.

Usage

Use LocalPipelineExecutor for development, testing, and single-machine production workloads. It is the default and most commonly used executor in datatrove. For multi-node distributed execution, consider the Slurm or Ray executors instead.

Code Reference

Source Location

Signature

class LocalPipelineExecutor(PipelineExecutor):
    def __init__(
        self,
        pipeline: list[PipelineStep | Callable],
        tasks: int = 1,
        workers: int = -1,
        logging_dir: DataFolderLike = None,
        depends: "LocalPipelineExecutor" = None,
        skip_completed: bool = True,
        start_method: str = "forkserver",
        local_tasks: int = -1,
        local_rank_offset: int = 0,
        randomize_start_duration: int = 0,
    ): ...

    def run(self) -> PipelineStats: ...

    @property
    def world_size(self) -> int: ...

Import

from datatrove.executor.local import LocalPipelineExecutor

I/O Contract

Inputs

Name Type Required Description
pipeline list[PipelineStep or Callable] Yes List of pipeline steps or lambda functions to execute
tasks int No Total number of tasks to run (default: 1)
workers int No Number of parallel workers; -1 means equal to tasks (default: -1)
logging_dir DataFolderLike No Directory for logs, stats, and completion markers
depends LocalPipelineExecutor No A prior executor that must complete first
skip_completed bool No Whether to skip previously completed tasks (default: True)
start_method str No Multiprocessing start method (default: "forkserver")
local_tasks int No Number of tasks to run on this node; -1 for all (default: -1)
local_rank_offset int No Starting rank for this node (default: 0)
randomize_start_duration int No Max seconds to randomly delay task starts (default: 0)

Outputs

Name Type Description
stats PipelineStats Merged statistics from all completed tasks
stats.json file Statistics file saved to the logging directory
completions files Per-rank completion marker files in the logging directory

Usage Examples

Basic Usage

from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.readers.jsonl import JsonlReader
from datatrove.pipeline.filters.language_filter import LanguageFilter
from datatrove.pipeline.writers.jsonl import JsonlWriter

# Single-worker sequential execution
executor = LocalPipelineExecutor(
    pipeline=[
        JsonlReader("input/"),
        LanguageFilter(languages=["en"]),
        JsonlWriter("output/"),
    ],
    tasks=4,
    workers=1,
    logging_dir="logs/my_pipeline",
)
stats = executor.run()

# Multi-worker parallel execution
executor = LocalPipelineExecutor(
    pipeline=[
        JsonlReader("input/"),
        JsonlWriter("output/"),
    ],
    tasks=16,
    workers=4,
    logging_dir="logs/parallel_run",
)
stats = executor.run()

# Chained executors with dependencies
step1 = LocalPipelineExecutor(pipeline=[...], tasks=8, logging_dir="logs/step1")
step2 = LocalPipelineExecutor(pipeline=[...], tasks=8, logging_dir="logs/step2", depends=step1)
step2.run()  # Automatically runs step1 first

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment