Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Environment:Datajuicer Data juicer Ray Cluster Environment

From Leeroopedia
Knowledge Sources
Domains Infrastructure, Distributed_Computing
Last Updated 2026-02-14 17:00 GMT

Overview

Distributed computing environment requiring Ray >= 2.51.0 with optional PySpark and S3 storage support for multi-node data processing pipelines.

Description

This environment provides the distributed execution context for Data-Juicer's Ray-based processing mode. It extends the base Python runtime with Ray cluster capabilities, operator-level isolated environments via `OPEnvSpec`, partition-based data processing, and distributed checkpoint management. The environment supports both local multi-core execution and multi-node cluster deployments with automatic resource detection (CPU cores, GPU memory, available RAM) across all nodes.

Usage

Use this environment when processing datasets that exceed single-machine capacity or when parallel execution across multiple nodes is required. It is the mandatory prerequisite for running the RayExecutor, RayExporter, distributed deduplication operators, and the partition size optimizer. Activate by setting `executor_type: ray` in the Data-Juicer configuration.

System Requirements

Category Requirement Notes
OS Linux (Ubuntu 20.04+ recommended) Multi-node clusters require shared filesystem or S3
Python >= 3.10 Same as base environment
Network Low-latency interconnect Required for multi-node Ray clusters
RAM 16GB+ per node Ray object store uses shared memory
Disk 50GB+ SSD For checkpointing, intermediate data, and Ray spill directory

Dependencies

System Packages

  • Ray cluster runtime (head + worker nodes)

Python Packages

  • `ray[default]` >= 2.51.0
  • `uvloop` == 0.21.0
  • `pyspark` == 3.5.5
  • `bitarray`

Optional Cloud Storage

  • `s3fs` (for S3-based datasets)
  • `boto3` (for AWS S3 operations)

Credentials

The following environment variables are used:

  • `RAY_ADDRESS`: Ray cluster head node address (default: `auto`)
  • `RAY_JOB_ENV_VAR`: Indicates running within a Ray job context
  • `CUDA_VISIBLE_DEVICES`: Controls GPU visibility per worker
  • `AWS_ACCESS_KEY_ID`: For S3 dataset access (if using S3 storage)
  • `AWS_SECRET_ACCESS_KEY`: For S3 dataset access
  • `AWS_SESSION_TOKEN`: For temporary S3 credentials
  • `AWS_REGION` / `AWS_DEFAULT_REGION`: AWS region for S3

Quick Install

# Install Data-Juicer with distributed extras
pip install "py-data-juicer[distributed]"

# Start Ray head node
ray start --head --port=6379

# Start Ray worker node (on additional machines)
ray start --address='<head-node-ip>:6379'

# Verify cluster
python -c "import ray; ray.init(); print(ray.cluster_resources())"

Code Evidence

Ray cluster initialization and resource detection from `resource_utils.py:66-96`:

def _cuda_device_count(cfg=None):
    _torch_available = _is_package_available("torch")
    if check_and_initialize_ray(cfg):
        return int(ray_gpu_count())
    if _torch_available:
        return torch.cuda.device_count()
    # Fallback to nvidia-smi
    try:
        nvidia_smi_output = subprocess.check_output(["nvidia-smi", "-L"], text=True)
        all_devices = nvidia_smi_output.strip().split("\n")
        return len(all_devices)
    except Exception:
        return 0

Default Ray address fallback from `ray_utils.py:29`:

logger.warning("No ray config provided, using default ray address 'auto'")

Operator environment spec for Ray isolation from `op_env.py:129-191`:

class OPEnvSpec:
    def __init__(self, pip_pkgs=None, env_vars=None, working_dir=None,
                 backend="uv", extra_env_params=None, parsed_requirements=None):
        assert self.backend in ["pip", "uv"], "Backend should be one of ['pip', 'uv']"

    def to_dict(self):
        runtime_env_dict = {}
        if self.pip_pkgs:
            runtime_env_dict[self.backend] = self.pip_pkgs
        if self.env_vars:
            runtime_env_dict["env_vars"] = self.env_vars
        if self.working_dir:
            runtime_env_dict["working_dir"] = self.working_dir
        return runtime_env_dict

Ray resource calculation from `process_utils.py:275-279`:

cuda_available = is_cuda_available()
total_cpu = ray_cpu_count()
total_gpu = ray_gpu_count()
available_mem = sum(ray_available_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024
available_gpu_mem = sum(ray_available_gpu_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024

Common Errors

Error Message Cause Solution
`No ray config provided, using default ray address 'auto'` Ray address not specified in config Set `ray_address` in config or ensure Ray cluster is running
`Could not detect Ray cluster resources` Ray not initialized or cluster unreachable Run `ray start --head` or check network connectivity
`Unknown checkpoint strategy, defaulting to EVERY_OP` Invalid checkpoint strategy in config Use one of: `every_op`, `every_n_ops`, `manual`, `disabled`
`Dataset is empty (0 rows), skipping operator processing` Empty partition in distributed processing Check data distribution; ensure dataset has content
`Dependency conflict ... fallback to unpinned version under LATEST strategy` Two operators need conflicting package versions Use isolated environments (separate OPEnvSpec) or resolve versions manually

Compatibility Notes

  • Operator Isolation: Ray mode supports per-operator isolated environments via `OPEnvSpec`. Each operator can declare its own pip dependencies, and the `OPEnvManager` attempts to merge compatible environments to reduce overhead.
  • Conflict Resolution: Three strategies for version conflicts: `SPLIT` (separate environments), `OVERWRITE` (use second version), `LATEST` (find latest compatible).
  • Checkpoint Strategies: Supports `EVERY_OP`, `EVERY_N_OPS`, `MANUAL`, and `DISABLED` checkpoint strategies for fault tolerance.
  • Slurm Integration: Scripts provided for Slurm-based HPC cluster deployment (`scripts/run_slurm.sh`).

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment