Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:Datajuicer Data juicer Operator Fusion Rules

From Leeroopedia
Knowledge Sources
Domains Optimization, Data_Processing
Last Updated 2026-02-14 17:00 GMT

Overview

Operator fusion optimization rules that merge consecutive filters sharing intermediate variables to reduce redundant data loading and processing overhead, with speed-based reordering for maximum throughput.

Description

Data-Juicer's operator fusion system identifies consecutive filter operators in a pipeline that share the same intermediate variable category (lines, words, loaded_images, loaded_audios, loaded_videos, sampled_frames) and merges them into a single `FusedFilter` that processes data in one pass. Fused filters use AND semantics (all must pass) and are reordered by execution speed so the fastest filters run first, eliminating data early. Only filters are fusible; mappers and deduplicators execute independently.

Usage

Use this heuristic when designing data processing pipelines with multiple filter operators. Place filters that operate on the same data modality (e.g., word-based filters) consecutively in the pipeline config to maximize fusion opportunities. This is automatically applied by the `fuse_operators` function during pipeline initialization.

The Insight (Rule of Thumb)

  • Rule 1 - Consecutive Filter Grouping: Only consecutive Filter operators are candidates for fusion. Any Mapper or non-Filter operator breaks the filter group.
  • Rule 2 - Intermediate Variable Matching: Filters are grouped by their registered intermediate variable category. Only filters in the same category can be fused.
    • `InterVars.words`: `words_num_filter`, `word_repetition_filter`, `stopwords_filter`, `flagged_words_filter`, `perplexity_filter`
    • `InterVars.lines`: `average_line_length_filter`, `maximum_line_length_filter`
    • `InterVars.loaded_images`: image processing filters
    • `InterVars.loaded_videos`: video processing filters
  • Rule 3 - Minimum Cardinality: Fusion only occurs when 2+ filters share an intermediate variable. Single filters skip fusion overhead.
  • Rule 4 - Speed-Based Reordering: Within a fused group, filters are reordered by descending speed (fastest first). Faster filters eliminate more data before slower filters run.
  • Rule 5 - Bottleneck Parallelism: Fused filters use the minimum `num_proc` across all constituent filters. The most constrained operator limits parallelism.
  • Rule 6 - Conservative GPU Rule: If ANY filter in a fused group uses CUDA, the entire fused group runs on CUDA.
  • Rule 7 - AND Semantics: A sample passes a fused filter only if ALL constituent filters return True (intersection semantics).
  • Trade-off: Fusion reduces data loading overhead (shared intermediate vars) at the cost of wrapper complexity. Fused speed is the harmonic mean: `1 / sum(1/individual_speeds)`.

Reasoning

Many text-processing filters need to tokenize text into words (a relatively expensive operation). Without fusion, each filter would re-tokenize independently. By fusing word-based filters, tokenization happens once and the word list is shared. The speed-based reordering follows the principle that cheap, selective filters should run first to minimize the data volume processed by expensive downstream filters.

The harmonic mean formula for combined speed correctly models sequential execution: the combined time is the sum of individual times, so the combined speed (samples/second) is the harmonic mean of individual speeds.

The AND semantics ensure no false positives: fusing filters is equivalent to running them sequentially. Samples must pass every filter to survive.

Code Evidence

Fusion entry point from `op_fusion.py:34-60`:

def fuse_operators(ops, probe_res=None):
    fused_ops = []
    filter_group = []
    for i, op in enumerate(ops):
        if isinstance(op, Filter):
            filter_group.append((op, probe_res[i] if probe_res else None))
        else:
            if filter_group:
                fused_ops.extend(fuse_filter_group(filter_group))
                filter_group = []
            fused_ops.append(op)
    if filter_group:
        fused_ops.extend(fuse_filter_group(filter_group))
    return fused_ops

Intermediate variable grouping from `op_fusion.py:75-86`:

for inter_vars in all_intermediate_vars:
    if op_name in inter_vars.modules:
        all_fused_filters[inter_vars].append((op, probe_res))
        break
else:
    # first apply other filters to decrease the number of samples
    fused_group.append(op)

Speed-based reordering from `op_fusion.py:116-121`:

fused_group = [op for op, _ in sorted(
    zip(fused_group, group_speed),
    key=lambda it: it[1], reverse=True
)]

Bottleneck parallelism from `op_fusion.py:142-147`:

accelerator_methods = set([op.accelerator for op in self.fused_filters])
if "cuda" in accelerator_methods:
    self.accelerator = "cuda"
self.num_proc = min([op.runtime_np() for op in self.fused_filters])

AND filter logic from `op_fusion.py:172-180`:

res = None
for op in self.fused_filters:
    this_res = np.logical_and(res, this_res)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment