Implementation:Datajuicer Data juicer Fuse Operators
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Performance_Optimization |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Concrete tool for merging consecutive compatible operators into fused execution units provided by the Data-Juicer framework.
Description
The fuse_operators function takes a list of operator instances and groups consecutive Filters that can share intermediate variables into FusedFilter objects. It also supports speed-based reordering when probe results are available. The companion Adapter.adapt_workloads method probes operator performance on a small batch to calculate optimal batch sizes.
Usage
Call after load_ops and before NestedDataset.process to optimize the operator pipeline. This step is optional but recommended for pipelines with multiple filter operators.
Code Reference
Source Location
- Repository: data-juicer
- File: data_juicer/ops/op_fusion.py (fuse_operators), data_juicer/core/adapter.py (adapt_workloads)
- Lines: op_fusion.py:L34-60, adapter.py:L102-115
Signature
def fuse_operators(ops: list, probe_res=None) -> list:
"""
Fuse consecutive compatible operators.
Args:
ops: List of operator instances.
probe_res: Optional list of probed speed metrics per operator.
Returns:
Reordered/fused list of operator instances.
"""
class Adapter:
def adapt_workloads(self, dataset, operators):
"""
Probe operator performance and calculate optimal batch sizes.
Args:
dataset: The dataset for probing.
operators: Operators in the data recipe.
Returns:
List of batch sizes, one per operator.
"""
Import
from data_juicer.ops.op_fusion import fuse_operators
from data_juicer.core.adapter import Adapter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ops | list | Yes | List of OP instances from load_ops |
| probe_res | list | No | Probed speed metrics per operator |
Outputs
| Name | Type | Description |
|---|---|---|
| fused_ops | list | Reordered/fused list of OP instances (FusedFilter for merged groups) |
Usage Examples
Basic Operator Fusion
from data_juicer.ops import load_ops
from data_juicer.ops.op_fusion import fuse_operators
operators = load_ops(cfg.process)
print(f"Before fusion: {len(operators)} operators")
fused = fuse_operators(operators)
print(f"After fusion: {len(fused)} operators")
# Consecutive filters sharing stats are merged into FusedFilter
Related Pages
Implements Principle
Requires Environment
- Environment:Datajuicer_Data_juicer_Python_Runtime_Environment
- Environment:Datajuicer_Data_juicer_GPU_CUDA_Environment