Implementation:Huggingface Datatrove PipelineStep
| Knowledge Sources | |
|---|---|
| Domains | Software Architecture, Data Processing |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Defines PipelineStep, the abstract base class that all pipeline processing blocks in datatrove must inherit from, providing dependency checking, statistics tracking, and the generator-based processing interface.
Description
PipelineStep is the most fundamental interface in the datatrove library. Every reader, filter, writer, extractor, deduplicator, and tokenizer extends this class. It uses Python's ABC mechanism to enforce that all subclasses implement a run method that receives a DocumentsPipeline generator and yields Document objects.
The class performs automatic dependency checking at instantiation time through a custom __new__ method. When a PipelineStep subclass is instantiated, the __new__ method walks the entire Method Resolution Order (MRO) chain, collects all _requires_dependencies declarations, and verifies that the required Python packages are installed before the __init__ method runs. This provides clear, early error messages when optional dependencies are missing.
Each instance receives a Stats object for metrics tracking. The stat_update method provides a flexible counter/metric interface where single-value counters (e.g., "total", "dropped") can be incremented, and numeric metrics (e.g., "doc_len") accumulate values for computing mean, min, max, and standard deviation. The track_time method returns a context manager for timing execution blocks, while update_doc_stats and update_media_stats compute standard document-level and media-level metrics.
The __call__ method delegates to run, allowing pipeline steps to be invoked as callables, which simplifies the executor's iteration logic.
Usage
Inherit from PipelineStep when creating any custom pipeline component. Implement the run method as a generator, set name and type class attributes for identification, and declare _requires_dependencies as a class variable listing any optional package dependencies.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/pipeline/base.py
- Lines: 1-130
Signature
class PipelineStep(ABC):
name: str = None
type: str = None
def __new__(cls, *args, **kwargs): ...
def __init__(self): ...
def stat_update(self, *labels, value: int = 1, unit: str = None): ...
def update_media_stats(self, media: Media): ...
def update_doc_stats(self, document: Document): ...
def track_time(self, unit: str = None): ...
@abstractmethod
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: ...
def __call__(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: ...
Import
from datatrove.pipeline.base import PipelineStep
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | DocumentsPipeline | Yes | Generator of Document objects from the previous pipeline step (or None for readers) |
| rank | int | No | The current task rank (default: 0), used for data sharding |
| world_size | int | No | The total number of tasks (default: 1), used for data sharding |
Outputs
| Name | Type | Description |
|---|---|---|
| DocumentsPipeline | Generator[Document] | Generator yielding Document objects to the next pipeline step |
| stats | Stats | Accumulated statistics for this pipeline step (accessible via self.stats) |
Usage Examples
Basic Usage
from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
class MyCustomFilter(PipelineStep):
type = "🔍 - FILTER"
name = "my-custom-filter"
def __init__(self, min_length: int = 100):
super().__init__()
self.min_length = min_length
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
from datatrove.utils.typeshelper import StatHints
for doc in data:
self.stat_update(StatHints.total)
with self.track_time():
if len(doc.text) >= self.min_length:
self.stat_update(StatHints.forwarded)
self.update_doc_stats(doc)
yield doc
else:
self.stat_update(StatHints.dropped)
# Use in a pipeline
from datatrove.executor.local import LocalPipelineExecutor
executor = LocalPipelineExecutor(
pipeline=[MyCustomFilter(min_length=50)],
tasks=1,
)