Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datajuicer Data juicer Operator Base Classes

From Leeroopedia
Knowledge Sources
Domains Data_Engineering, Software_Architecture
Last Updated 2026-02-14 17:00 GMT

Overview

Pattern documentation for the operator base class hierarchy that defines the interface contract for custom Data-Juicer operators.

Description

Data-Juicer defines base classes in data_juicer/ops/base_op.py that custom operators must inherit. The OP base class provides common configuration (text_key, image_key, num_proc, batch_size, accelerator), and each subclass enforces a specific processing contract through abstract methods.

Usage

Inherit from the appropriate base class based on your operator's processing goal. Override the required abstract methods.

Code Reference

Source Location

  • Repository: data-juicer
  • File: data_juicer/ops/base_op.py
  • Lines: L289-553 (OP), L555-665 (Mapper), L666-815 (Filter), L816-881 (Deduplicator), L882-911 (Selector)

Interface Specification

class OP:
    """Base operator class."""
    def __init__(
        self,
        text_key='text',
        image_key='images',
        audio_key='audios',
        video_key='videos',
        query_key='query',
        response_key='response',
        num_proc=1,
        batch_size=1000,
        accelerator=None,
        **kwargs
    ):
        pass

class Mapper(OP):
    """1:1 sample transformation. Override process_single or process_batched."""
    def process_single(self, sample):
        raise NotImplementedError

    def process_batched(self, samples):
        raise NotImplementedError

class Filter(OP):
    """Quality-based filtering. Override compute_stats_single and process_single."""
    def compute_stats_single(self, sample, context=False):
        raise NotImplementedError

    def process_single(self, sample):
        """Return True to keep, False to remove."""
        raise NotImplementedError

class Deduplicator(OP):
    """Duplicate removal. Override compute_hash and process."""
    def compute_hash(self, sample):
        raise NotImplementedError

    def process(self, dataset, show_num=0):
        raise NotImplementedError

class Selector(OP):
    """Dataset-level selection. Override process."""
    def process(self, dataset):
        raise NotImplementedError

Import

from data_juicer.ops.base_op import Filter, Mapper, Deduplicator, Selector

I/O Contract

Inputs

Name Type Required Description
sample dict Yes (single) Single data sample with text/media fields
samples dict of lists Yes (batched) Batch of samples in columnar format
dataset DJDataset Yes (Dedup/Selector) Full dataset for global operations

Outputs

Name Type Description
sample (Mapper) dict Transformed sample
bool (Filter) bool True to keep, False to remove
dataset (Dedup/Selector) DJDataset Filtered/selected dataset

Usage Examples

Custom Filter

from data_juicer.ops.base_op import OPERATORS, Filter
from data_juicer.utils.constant import StatsKeys

@OPERATORS.register_module('my_custom_filter')
class MyCustomFilter(Filter):
    def __init__(self, min_score=0.5, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.min_score = min_score

    def compute_stats_single(self, sample, context=False):
        # Compute and store statistic
        text = sample[self.text_key]
        score = len(set(text.split())) / max(len(text.split()), 1)
        sample[Fields.stats][StatsKeys.word_rep_ratio] = score
        return sample

    def process_single(self, sample):
        score = sample[Fields.stats][StatsKeys.word_rep_ratio]
        return score >= self.min_score

Custom Mapper

from data_juicer.ops.base_op import OPERATORS, Mapper

@OPERATORS.register_module('my_text_cleaner')
class MyTextCleaner(Mapper):
    def process_single(self, sample):
        sample[self.text_key] = sample[self.text_key].strip().lower()
        return sample

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment