Implementation:Datajuicer Data juicer RayExecutor Run
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Computing, Data_Engineering |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Concrete tool for executing Data-Juicer pipelines on Ray clusters provided by the Data-Juicer framework.
Description
RayExecutor provides basic distributed execution by streaming the dataset through operators on Ray. PartitionedRayExecutor extends this with data partitioning, convergence points for global operations (deduplication), DAG-based execution scheduling, and checkpointing. The partitioned executor splits data via ray.data.Dataset.split(), processes partitions independently, and merges at convergence points.
Usage
Use when the pipeline config specifies executor_type: ray or executor_type: ray_partitioned. The executor is created automatically by tools/process_data.py based on config.
Code Reference
Source Location
- Repository: data-juicer
- File: data_juicer/core/executor/ray_executor.py (RayExecutor), data_juicer/core/executor/ray_executor_partitioned.py (PartitionedRayExecutor)
- Lines: ray_executor.py:L37-223, ray_executor_partitioned.py:L344-1004
Signature
class RayExecutor:
def __init__(self, cfg=None):
"""
Args:
cfg: Parsed pipeline config namespace.
"""
def run(
self,
load_data_np=None,
skip_export=False,
skip_return=False
) -> RayDataset:
"""
Run the distributed pipeline.
Args:
load_data_np: Number of data loading workers.
skip_export: Skip disk export.
skip_return: Skip returning the dataset.
Returns:
Processed RayDataset.
"""
class PartitionedRayExecutor:
def run(
self,
load_data_np=None,
skip_return=False
) -> RayDataset:
"""
Run the partitioned distributed pipeline.
Args:
load_data_np: Number of data loading workers.
skip_return: Skip returning the dataset.
Returns:
Processed RayDataset.
"""
Import
from data_juicer.core.executor import RayExecutor
from data_juicer.core.executor.ray_executor_partitioned import PartitionedRayExecutor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| cfg | Namespace | Yes (init) | Pipeline config with executor_type, dataset_path, process list |
| load_data_np | PositiveInt | No | Number of data loading workers |
| skip_export | bool | No | Skip exporting to disk |
Outputs
| Name | Type | Description |
|---|---|---|
| dataset | RayDataset | Processed dataset on Ray cluster |
| exports | Files | Exported result files (unless skip_export=True) |
Usage Examples
Basic Ray Execution
from data_juicer.config import init_configs
from data_juicer.core.executor import RayExecutor
cfg = init_configs(args=['--config', 'ray_pipeline.yaml'])
# ray_pipeline.yaml has: executor_type: ray
executor = RayExecutor(cfg)
dataset = executor.run()
Partitioned Execution
from data_juicer.config import init_configs
from data_juicer.core.executor.ray_executor_partitioned import PartitionedRayExecutor
cfg = init_configs(args=['--config', 'partitioned_pipeline.yaml'])
# partitioned_pipeline.yaml has: executor_type: ray_partitioned
executor = PartitionedRayExecutor(cfg)
dataset = executor.run()