Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove PipelineStep

From Leeroopedia
Knowledge Sources
Domains Software Architecture, Data Processing
Last Updated 2026-02-14 17:00 GMT

Overview

Defines PipelineStep, the abstract base class that all pipeline processing blocks in datatrove must inherit from, providing dependency checking, statistics tracking, and the generator-based processing interface.

Description

PipelineStep is the most fundamental interface in the datatrove library. Every reader, filter, writer, extractor, deduplicator, and tokenizer extends this class. It uses Python's ABC mechanism to enforce that all subclasses implement a run method that receives a DocumentsPipeline generator and yields Document objects.

The class performs automatic dependency checking at instantiation time through a custom __new__ method. When a PipelineStep subclass is instantiated, the __new__ method walks the entire Method Resolution Order (MRO) chain, collects all _requires_dependencies declarations, and verifies that the required Python packages are installed before the __init__ method runs. This provides clear, early error messages when optional dependencies are missing.

Each instance receives a Stats object for metrics tracking. The stat_update method provides a flexible counter/metric interface where single-value counters (e.g., "total", "dropped") can be incremented, and numeric metrics (e.g., "doc_len") accumulate values for computing mean, min, max, and standard deviation. The track_time method returns a context manager for timing execution blocks, while update_doc_stats and update_media_stats compute standard document-level and media-level metrics.

The __call__ method delegates to run, allowing pipeline steps to be invoked as callables, which simplifies the executor's iteration logic.

Usage

Inherit from PipelineStep when creating any custom pipeline component. Implement the run method as a generator, set name and type class attributes for identification, and declare _requires_dependencies as a class variable listing any optional package dependencies.

Code Reference

Source Location

Signature

class PipelineStep(ABC):
    name: str = None
    type: str = None

    def __new__(cls, *args, **kwargs): ...
    def __init__(self): ...
    def stat_update(self, *labels, value: int = 1, unit: str = None): ...
    def update_media_stats(self, media: Media): ...
    def update_doc_stats(self, document: Document): ...
    def track_time(self, unit: str = None): ...

    @abstractmethod
    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: ...

    def __call__(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: ...

Import

from datatrove.pipeline.base import PipelineStep

I/O Contract

Inputs

Name Type Required Description
data DocumentsPipeline Yes Generator of Document objects from the previous pipeline step (or None for readers)
rank int No The current task rank (default: 0), used for data sharding
world_size int No The total number of tasks (default: 1), used for data sharding

Outputs

Name Type Description
DocumentsPipeline Generator[Document] Generator yielding Document objects to the next pipeline step
stats Stats Accumulated statistics for this pipeline step (accessible via self.stats)

Usage Examples

Basic Usage

from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline

class MyCustomFilter(PipelineStep):
    type = "🔍 - FILTER"
    name = "my-custom-filter"

    def __init__(self, min_length: int = 100):
        super().__init__()
        self.min_length = min_length

    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        from datatrove.utils.typeshelper import StatHints
        for doc in data:
            self.stat_update(StatHints.total)
            with self.track_time():
                if len(doc.text) >= self.min_length:
                    self.stat_update(StatHints.forwarded)
                    self.update_doc_stats(doc)
                    yield doc
                else:
                    self.stat_update(StatHints.dropped)

# Use in a pipeline
from datatrove.executor.local import LocalPipelineExecutor
executor = LocalPipelineExecutor(
    pipeline=[MyCustomFilter(min_length=50)],
    tasks=1,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment