Environment:Datajuicer Data juicer Ray Cluster Environment
| Knowledge Sources | |
|---|---|
| Domains | Infrastructure, Distributed_Computing |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Distributed computing environment requiring Ray >= 2.51.0 with optional PySpark and S3 storage support for multi-node data processing pipelines.
Description
This environment provides the distributed execution context for Data-Juicer's Ray-based processing mode. It extends the base Python runtime with Ray cluster capabilities, operator-level isolated environments via `OPEnvSpec`, partition-based data processing, and distributed checkpoint management. The environment supports both local multi-core execution and multi-node cluster deployments with automatic resource detection (CPU cores, GPU memory, available RAM) across all nodes.
Usage
Use this environment when processing datasets that exceed single-machine capacity or when parallel execution across multiple nodes is required. It is the mandatory prerequisite for running the RayExecutor, RayExporter, distributed deduplication operators, and the partition size optimizer. Activate by setting `executor_type: ray` in the Data-Juicer configuration.
System Requirements
| Category | Requirement | Notes |
|---|---|---|
| OS | Linux (Ubuntu 20.04+ recommended) | Multi-node clusters require shared filesystem or S3 |
| Python | >= 3.10 | Same as base environment |
| Network | Low-latency interconnect | Required for multi-node Ray clusters |
| RAM | 16GB+ per node | Ray object store uses shared memory |
| Disk | 50GB+ SSD | For checkpointing, intermediate data, and Ray spill directory |
Dependencies
System Packages
- Ray cluster runtime (head + worker nodes)
Python Packages
- `ray[default]` >= 2.51.0
- `uvloop` == 0.21.0
- `pyspark` == 3.5.5
- `bitarray`
Optional Cloud Storage
- `s3fs` (for S3-based datasets)
- `boto3` (for AWS S3 operations)
Credentials
The following environment variables are used:
- `RAY_ADDRESS`: Ray cluster head node address (default: `auto`)
- `RAY_JOB_ENV_VAR`: Indicates running within a Ray job context
- `CUDA_VISIBLE_DEVICES`: Controls GPU visibility per worker
- `AWS_ACCESS_KEY_ID`: For S3 dataset access (if using S3 storage)
- `AWS_SECRET_ACCESS_KEY`: For S3 dataset access
- `AWS_SESSION_TOKEN`: For temporary S3 credentials
- `AWS_REGION` / `AWS_DEFAULT_REGION`: AWS region for S3
Quick Install
# Install Data-Juicer with distributed extras
pip install "py-data-juicer[distributed]"
# Start Ray head node
ray start --head --port=6379
# Start Ray worker node (on additional machines)
ray start --address='<head-node-ip>:6379'
# Verify cluster
python -c "import ray; ray.init(); print(ray.cluster_resources())"
Code Evidence
Ray cluster initialization and resource detection from `resource_utils.py:66-96`:
def _cuda_device_count(cfg=None):
_torch_available = _is_package_available("torch")
if check_and_initialize_ray(cfg):
return int(ray_gpu_count())
if _torch_available:
return torch.cuda.device_count()
# Fallback to nvidia-smi
try:
nvidia_smi_output = subprocess.check_output(["nvidia-smi", "-L"], text=True)
all_devices = nvidia_smi_output.strip().split("\n")
return len(all_devices)
except Exception:
return 0
Default Ray address fallback from `ray_utils.py:29`:
logger.warning("No ray config provided, using default ray address 'auto'")
Operator environment spec for Ray isolation from `op_env.py:129-191`:
class OPEnvSpec:
def __init__(self, pip_pkgs=None, env_vars=None, working_dir=None,
backend="uv", extra_env_params=None, parsed_requirements=None):
assert self.backend in ["pip", "uv"], "Backend should be one of ['pip', 'uv']"
def to_dict(self):
runtime_env_dict = {}
if self.pip_pkgs:
runtime_env_dict[self.backend] = self.pip_pkgs
if self.env_vars:
runtime_env_dict["env_vars"] = self.env_vars
if self.working_dir:
runtime_env_dict["working_dir"] = self.working_dir
return runtime_env_dict
Ray resource calculation from `process_utils.py:275-279`:
cuda_available = is_cuda_available()
total_cpu = ray_cpu_count()
total_gpu = ray_gpu_count()
available_mem = sum(ray_available_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024
available_gpu_mem = sum(ray_available_gpu_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024
Common Errors
| Error Message | Cause | Solution |
|---|---|---|
| `No ray config provided, using default ray address 'auto'` | Ray address not specified in config | Set `ray_address` in config or ensure Ray cluster is running |
| `Could not detect Ray cluster resources` | Ray not initialized or cluster unreachable | Run `ray start --head` or check network connectivity |
| `Unknown checkpoint strategy, defaulting to EVERY_OP` | Invalid checkpoint strategy in config | Use one of: `every_op`, `every_n_ops`, `manual`, `disabled` |
| `Dataset is empty (0 rows), skipping operator processing` | Empty partition in distributed processing | Check data distribution; ensure dataset has content |
| `Dependency conflict ... fallback to unpinned version under LATEST strategy` | Two operators need conflicting package versions | Use isolated environments (separate OPEnvSpec) or resolve versions manually |
Compatibility Notes
- Operator Isolation: Ray mode supports per-operator isolated environments via `OPEnvSpec`. Each operator can declare its own pip dependencies, and the `OPEnvManager` attempts to merge compatible environments to reduce overhead.
- Conflict Resolution: Three strategies for version conflicts: `SPLIT` (separate environments), `OVERWRITE` (use second version), `LATEST` (find latest compatible).
- Checkpoint Strategies: Supports `EVERY_OP`, `EVERY_N_OPS`, `MANUAL`, and `DISABLED` checkpoint strategies for fault tolerance.
- Slurm Integration: Scripts provided for Slurm-based HPC cluster deployment (`scripts/run_slurm.sh`).
Related Pages
- Implementation:Datajuicer_Data_juicer_Pip_Install_Ray_Extras
- Implementation:Datajuicer_Data_juicer_Ray_Start_Cluster
- Implementation:Datajuicer_Data_juicer_RayExecutor_Run
- Implementation:Datajuicer_Data_juicer_RayExporter_Export
- Implementation:Datajuicer_Data_juicer_PartitionSizeOptimizer_Calculate
- Implementation:Datajuicer_Data_juicer_EventLoggingMixin_CheckpointManager