Implementation:Huggingface Datatrove BaseMediaFilter
| Knowledge Sources | |
|---|---|
| Domains | Media Processing, Data Filtering |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
BaseMediaContentFilter is an abstract base class that provides the framework for filtering individual media objects within documents in a Datatrove pipeline.
Description
The BaseMediaContentFilter class extends both PipelineStep and ABC to define the contract for media filtering operations. Subclasses must implement the filter method, which examines a single Media object and returns either a boolean indicating whether the media should be kept, or a tuple of (False, reason_string) to drop it with a specific reason annotation.
The run method implements the pipeline execution logic by iterating over all documents in the data pipeline and applying the filter to each media object attached to a document. When a media item fails the filter, its media_bytes attribute is set to None (effectively removing the binary data) and a filter_reason is recorded in the media's metadata. The method tracks statistics for total items processed, items dropped, and items forwarded, providing observability into filtering effectiveness.
This design keeps the binary data flow efficient: rather than removing media objects entirely from the document, it nullifies their bytes, allowing downstream steps to inspect which media was filtered and why. Documents are always yielded regardless of whether their media passed or failed filtering.
Usage
Use BaseMediaContentFilter as the base class when implementing custom media filters. Subclass it and implement the filter method with your specific filtering logic. The base class handles iteration, statistics tracking, and metadata annotation automatically.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/pipeline/media/filters/base_filter.py
- Lines: 1-50
Signature
class BaseMediaContentFilter(PipelineStep, ABC):
type = "🔻️ - MEDIA FILTER"
def __init__(self):
...
@abstractmethod
def filter(self, media: Media) -> bool | Tuple[bool, str]:
...
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
...
Import
from datatrove.pipeline.media.filters.base_filter import BaseMediaContentFilter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | DocumentsPipeline | Yes | Generator of Document objects, each potentially containing media items |
| rank | int | No | Rank of the current worker (default: 0) |
| world_size | int | No | Total number of workers (default: 1) |
Outputs
| Name | Type | Description |
|---|---|---|
| documents | DocumentsPipeline | Generator yielding all input documents; filtered media have media_bytes set to None and filter_reason in metadata |
Usage Examples
Basic Usage
from datatrove.pipeline.media.filters.base_filter import BaseMediaContentFilter
from datatrove.data import Media
class SizeFilter(BaseMediaContentFilter):
"""Filter media items larger than a threshold."""
def __init__(self, max_size_bytes: int = 10 * 1024 * 1024):
super().__init__()
self.max_size_bytes = max_size_bytes
def filter(self, media: Media) -> bool | tuple[bool, str]:
if media.media_bytes is None:
return False, "no_bytes"
if len(media.media_bytes) > self.max_size_bytes:
return False, "too_large"
return True
# Use in a pipeline
pipeline = [
# ... reader step ...
SizeFilter(max_size_bytes=5 * 1024 * 1024),
# ... writer step ...
]