Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datajuicer Data juicer RayExecutor Run

From Leeroopedia
Knowledge Sources
Domains Distributed_Computing, Data_Engineering
Last Updated 2026-02-14 17:00 GMT

Overview

Concrete tool for executing Data-Juicer pipelines on Ray clusters provided by the Data-Juicer framework.

Description

RayExecutor provides basic distributed execution by streaming the dataset through operators on Ray. PartitionedRayExecutor extends this with data partitioning, convergence points for global operations (deduplication), DAG-based execution scheduling, and checkpointing. The partitioned executor splits data via ray.data.Dataset.split(), processes partitions independently, and merges at convergence points.

Usage

Use when the pipeline config specifies executor_type: ray or executor_type: ray_partitioned. The executor is created automatically by tools/process_data.py based on config.

Code Reference

Source Location

  • Repository: data-juicer
  • File: data_juicer/core/executor/ray_executor.py (RayExecutor), data_juicer/core/executor/ray_executor_partitioned.py (PartitionedRayExecutor)
  • Lines: ray_executor.py:L37-223, ray_executor_partitioned.py:L344-1004

Signature

class RayExecutor:
    def __init__(self, cfg=None):
        """
        Args:
            cfg: Parsed pipeline config namespace.
        """

    def run(
        self,
        load_data_np=None,
        skip_export=False,
        skip_return=False
    ) -> RayDataset:
        """
        Run the distributed pipeline.

        Args:
            load_data_np: Number of data loading workers.
            skip_export: Skip disk export.
            skip_return: Skip returning the dataset.

        Returns:
            Processed RayDataset.
        """

class PartitionedRayExecutor:
    def run(
        self,
        load_data_np=None,
        skip_return=False
    ) -> RayDataset:
        """
        Run the partitioned distributed pipeline.

        Args:
            load_data_np: Number of data loading workers.
            skip_return: Skip returning the dataset.

        Returns:
            Processed RayDataset.
        """

Import

from data_juicer.core.executor import RayExecutor
from data_juicer.core.executor.ray_executor_partitioned import PartitionedRayExecutor

I/O Contract

Inputs

Name Type Required Description
cfg Namespace Yes (init) Pipeline config with executor_type, dataset_path, process list
load_data_np PositiveInt No Number of data loading workers
skip_export bool No Skip exporting to disk

Outputs

Name Type Description
dataset RayDataset Processed dataset on Ray cluster
exports Files Exported result files (unless skip_export=True)

Usage Examples

Basic Ray Execution

from data_juicer.config import init_configs
from data_juicer.core.executor import RayExecutor

cfg = init_configs(args=['--config', 'ray_pipeline.yaml'])
# ray_pipeline.yaml has: executor_type: ray
executor = RayExecutor(cfg)
dataset = executor.run()

Partitioned Execution

from data_juicer.config import init_configs
from data_juicer.core.executor.ray_executor_partitioned import PartitionedRayExecutor

cfg = init_configs(args=['--config', 'partitioned_pipeline.yaml'])
# partitioned_pipeline.yaml has: executor_type: ray_partitioned
executor = PartitionedRayExecutor(cfg)
dataset = executor.run()

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment