Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Datajuicer Data juicer Distributed Ray Processing

From Leeroopedia
Knowledge Sources
Domains Data_Engineering, Distributed_Computing, LLM_Ops
Last Updated 2026-02-14 16:00 GMT

Overview

End-to-end process for running Data-Juicer data processing pipelines at scale across multi-node Ray clusters, including dataset partitioning, distributed deduplication, checkpointing, and event logging.

Description

This workflow extends the standard Data-Juicer processing pipeline to distributed execution using Ray. It covers three executor modes: ray (basic Ray executor using Ray Datasets), ray_partitioned (advanced executor with automatic partition optimization, checkpointing, and DAG execution). Almost all operators implemented for single-machine mode work seamlessly in Ray mode, with dedicated distributed versions provided for deduplication (ray_bts_minhash_deduplicator, ray_document_deduplicator, ray_image_deduplicator, ray_video_deduplicator). The pipeline supports automatic subset splitting for optimal data distribution, streaming JSON reading to avoid out-of-memory issues, and partition-level checkpointing for fault tolerance.

Usage

Execute this workflow when you need to process datasets that are too large for a single machine, or when you want to leverage a multi-node cluster for faster processing. Typical scenarios include cleaning web-scale pre-training corpora (billions of samples), deduplicating terabyte-sized datasets, or running compute-intensive operators (e.g., perplexity filtering, image/video processing) across many GPUs. The cluster should have Ray installed and a head node started.

Execution Steps

Step 1: Install Distributed Dependencies

Install Data-Juicer with the dist extras to include Ray and related distributed libraries. This adds ray, ray[data], and supporting packages to the environment.

Key considerations:

  • Use uv pip install -v -e ".[dist]" for the dist extras
  • Ray version compatibility is managed by Data-Juicer's dependency specifications
  • All nodes in the cluster need Data-Juicer and its dependencies installed

Step 2: Start Ray Cluster

Initialize a Ray cluster by starting the head node, then optionally connecting worker nodes. The head node address is used in the configuration file to connect the executor to the cluster.

Key considerations:

  • Start the head node with ray start --head
  • Connect workers with ray start --address={head_ip}:6379
  • The default Ray port (6379) conflicts with Redis; adjust if using both
  • Use ray_address: auto in config to auto-detect the local cluster

Step 3: Configure Distributed Pipeline

Create a YAML configuration file with executor_type set to ray or ray_partitioned. Specify the Ray cluster address, dataset path (must be accessible from all nodes, e.g., shared NAS or S3), and the operator process list. For the partitioned executor, configure partition settings (mode, count, target size), checkpoint strategy, and event logging.

Key considerations:

  • Dataset files must be on shared storage accessible by all nodes
  • Use ray_partitioned executor for large-scale jobs needing checkpointing
  • Partition mode can be auto (optimizer-driven) or manual (fixed count)
  • Checkpoint strategies include every_op, every_n_ops, and every_partition

Step 4: Optimize Data Distribution

For large datasets with few files, the executor automatically splits data into smaller partitions to balance work across nodes. The auto-split strategy targets 128MB per file and ensures sub-file count is at least twice the total CPU cores. For the partitioned executor, a partition size optimizer analyzes data characteristics and available resources.

Key considerations:

  • The tools/data_resplit.py utility can pre-split data manually
  • Streaming JSON reading avoids loading entire files into memory
  • Partition size affects checkpoint granularity and recovery speed
  • Arrow-based columnar storage enables efficient distributed processing

Step 5: Execute Distributed Pipeline

Run the pipeline using python tools/process_data.py --config config.yaml or dj-process --config config.yaml. The RayExecutor or PartitionedRayExecutor distributes operator execution across the cluster. Each operator processes data partitions in parallel using Ray Tasks or Actors. For deduplication, use the dedicated ray_bts_minhash_deduplicator which implements a distributed Union-Find algorithm with load-balanced BTS merging.

Key considerations:

  • Standard operators work in Ray mode without modification
  • Deduplication operators have dedicated distributed implementations
  • The DAG execution mixin enables parallel scheduling of independent operators
  • Event logging tracks per-operator timing and resource usage across nodes

Step 6: Monitor and Resume

During execution, event logs record job progress, operator completion, and resource usage. If a job fails, the checkpoint system allows resumption from the last completed partition and operator. Use the --job_id flag to resume a failed job.

Key considerations:

  • Event logs are written to the configured event_log_dir
  • Resume with dj-process --config config.yaml --job_id {job_id}
  • The job management system tracks job state and metadata
  • Resource monitoring covers CPU, memory, and disk utilization

Step 7: Export Distributed Results

The Ray exporter collects processed partitions and writes them to the output path. It supports parallel export and S3 destinations. For the partitioned executor, results from all partitions are consolidated into the final output.

Key considerations:

  • Output path must be on shared storage for multi-node clusters
  • Parallel export speeds up writing of large result datasets
  • S3 export requires AWS credentials in the configuration

Execution Diagram

GitHub URL

Workflow Repository