Implementation:Datajuicer Data juicer Operator Base Classes
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Software_Architecture |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Pattern documentation for the operator base class hierarchy that defines the interface contract for custom Data-Juicer operators.
Description
Data-Juicer defines base classes in data_juicer/ops/base_op.py that custom operators must inherit. The OP base class provides common configuration (text_key, image_key, num_proc, batch_size, accelerator), and each subclass enforces a specific processing contract through abstract methods.
Usage
Inherit from the appropriate base class based on your operator's processing goal. Override the required abstract methods.
Code Reference
Source Location
- Repository: data-juicer
- File: data_juicer/ops/base_op.py
- Lines: L289-553 (OP), L555-665 (Mapper), L666-815 (Filter), L816-881 (Deduplicator), L882-911 (Selector)
Interface Specification
class OP:
"""Base operator class."""
def __init__(
self,
text_key='text',
image_key='images',
audio_key='audios',
video_key='videos',
query_key='query',
response_key='response',
num_proc=1,
batch_size=1000,
accelerator=None,
**kwargs
):
pass
class Mapper(OP):
"""1:1 sample transformation. Override process_single or process_batched."""
def process_single(self, sample):
raise NotImplementedError
def process_batched(self, samples):
raise NotImplementedError
class Filter(OP):
"""Quality-based filtering. Override compute_stats_single and process_single."""
def compute_stats_single(self, sample, context=False):
raise NotImplementedError
def process_single(self, sample):
"""Return True to keep, False to remove."""
raise NotImplementedError
class Deduplicator(OP):
"""Duplicate removal. Override compute_hash and process."""
def compute_hash(self, sample):
raise NotImplementedError
def process(self, dataset, show_num=0):
raise NotImplementedError
class Selector(OP):
"""Dataset-level selection. Override process."""
def process(self, dataset):
raise NotImplementedError
Import
from data_juicer.ops.base_op import Filter, Mapper, Deduplicator, Selector
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| sample | dict | Yes (single) | Single data sample with text/media fields |
| samples | dict of lists | Yes (batched) | Batch of samples in columnar format |
| dataset | DJDataset | Yes (Dedup/Selector) | Full dataset for global operations |
Outputs
| Name | Type | Description |
|---|---|---|
| sample (Mapper) | dict | Transformed sample |
| bool (Filter) | bool | True to keep, False to remove |
| dataset (Dedup/Selector) | DJDataset | Filtered/selected dataset |
Usage Examples
Custom Filter
from data_juicer.ops.base_op import OPERATORS, Filter
from data_juicer.utils.constant import StatsKeys
@OPERATORS.register_module('my_custom_filter')
class MyCustomFilter(Filter):
def __init__(self, min_score=0.5, *args, **kwargs):
super().__init__(*args, **kwargs)
self.min_score = min_score
def compute_stats_single(self, sample, context=False):
# Compute and store statistic
text = sample[self.text_key]
score = len(set(text.split())) / max(len(text.split()), 1)
sample[Fields.stats][StatsKeys.word_rep_ratio] = score
return sample
def process_single(self, sample):
score = sample[Fields.stats][StatsKeys.word_rep_ratio]
return score >= self.min_score
Custom Mapper
from data_juicer.ops.base_op import OPERATORS, Mapper
@OPERATORS.register_module('my_text_cleaner')
class MyTextCleaner(Mapper):
def process_single(self, sample):
sample[self.text_key] = sample[self.text_key].strip().lower()
return sample
Related Pages
Implements Principle
Requires Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment