Implementation:Datajuicer Data juicer NestedDataset Process
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Pipeline_Execution |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Concrete tool for applying a chain of data processing operators to a HuggingFace-based dataset provided by the Data-Juicer framework.
Description
The NestedDataset.process method is the core execution loop for single-machine Data-Juicer pipelines. It iterates over a list of operators, applying each to the dataset via dataset.map() (for Mappers and Filters) or dataset.filter() (for Filters after stats computation). It integrates with the Exporter for intermediate exports, CheckpointManager for resumable processing, Tracer for sample-level auditing, and Adapter for adaptive batch sizing.
Usage
This method is called by DefaultExecutor.run() after loading the dataset and instantiating operators. It is the primary processing entry point for non-distributed pipelines.
Code Reference
Source Location
- Repository: data-juicer
- File: data_juicer/core/data/dj_dataset.py
- Lines: L254-285
Signature
class NestedDataset(DJDataset):
def process(
self,
operators,
*,
work_dir=None,
exporter=None,
checkpointer=None,
tracer=None,
adapter=None,
open_monitor=True
) -> 'NestedDataset':
"""
Apply operators sequentially to the dataset.
Args:
operators: List of OP instances to apply.
work_dir: Output directory for intermediate results.
exporter: Exporter for intermediate exports.
checkpointer: CheckpointManager for resumable processing.
tracer: Tracer for sample-level auditing.
adapter: Adapter for adaptive batch sizes.
open_monitor: Enable resource monitoring.
Returns:
Processed NestedDataset with all operators applied.
"""
Import
from data_juicer.core.data import NestedDataset
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| operators | list | Yes | List of OP instances (Filter, Mapper, etc.) |
| work_dir | str | No | Output directory |
| exporter | Exporter | No | For intermediate dataset exports |
| checkpointer | CheckpointManager | No | For resumable processing |
| tracer | Tracer | No | For sample-level tracing |
| adapter | Adapter | No | For adaptive batch sizing |
| open_monitor | bool | No | Enable resource monitoring (default: True) |
Outputs
| Name | Type | Description |
|---|---|---|
| dataset | NestedDataset | Processed dataset with all operators applied sequentially |
Usage Examples
Direct Process Call
from data_juicer.config import init_configs
from data_juicer.core.data import DatasetBuilder
from data_juicer.ops import load_ops
cfg = init_configs(args=['--config', 'pipeline.yaml'])
builder = DatasetBuilder(cfg)
dataset = builder.load_dataset()
operators = load_ops(cfg.process)
# Process the dataset
result = dataset.process(operators, work_dir=cfg.work_dir)
print(f"Samples remaining: {len(result)}")
Via DefaultExecutor (Recommended)
from data_juicer.config import init_configs
from data_juicer.core.executor import DefaultExecutor
cfg = init_configs(args=['--config', 'pipeline.yaml'])
executor = DefaultExecutor(cfg)
dataset = executor.run()
# Internally calls dataset.process() with all orchestration