Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Environment:Eventual Inc Daft Ray Distributed Runner

From Leeroopedia


Knowledge Sources
Domains Distributed Computing, Ray, Cluster Execution
Last Updated 2026-02-08 15:30 GMT

Overview

The Ray_Distributed_Runner environment provides optional distributed execution capabilities for Daft by integrating with the Ray framework, enabling dataframe operations to scale across multi-node clusters.

Description

Daft supports two execution runners: the default native runner (local multithreaded) and the optional Ray runner (distributed). When using the Ray runner, Daft leverages Ray's task scheduling, actor model, and distributed object store to parallelize dataframe operations across a cluster of machines.

The Ray integration includes:

  • Automatic Ray detection -- Daft checks whether Ray is already initialized via ray.is_initialized() and whether the RAY_JOB_ID environment variable is set, allowing seamless integration within existing Ray jobs.
  • Flotilla worker actors -- Daft spawns Ray actors (called "Swordfish" actors via the Flotilla system) that execute query plans on worker nodes. These actors set DAFT_FLOTILLA_WORKER=1 and configure CUDA_VISIBLE_DEVICES when GPUs are allocated.
  • Configurable execution parameters -- including minimum CPU allocation per task, actor UDF ready timeout, task backlog limits, and default shuffle partition counts.

The Ray extras install both ray[data] (for dataset integration) and ray[client] (for remote cluster connectivity).

Usage

Use the Ray distributed runner when:

  • Your data exceeds single-machine memory capacity
  • You need to distribute computation across a multi-node cluster
  • You are already running within a Ray cluster or Ray job
  • You need GPU-accelerated UDFs across multiple GPU nodes

System Requirements

Category Requirement Notes
Python >= 3.10 Inherited from the core environment
Ray (non-Windows) >= 2.0.0, < 2.53.0 From pyproject.toml line 54
Ray (Windows) >= 2.10.0, < 2.53.0 Ray 2.10+ removed the PyArrow upper pin on Windows (pyproject.toml line 55)
Operating System Linux, macOS, Windows Full support on Linux/macOS; Windows requires Ray >= 2.10.0
Network Cluster connectivity Required for multi-node Ray clusters; local mode works without network

Dependencies

System Packages

  • All core environment system packages (Rust toolchain for source builds, etc.)
  • Ray cluster infrastructure -- either a local Ray instance or a remote Ray cluster

Python Packages

  • All core environment Python packages (pyarrow, fsspec, tqdm, packaging)
  • ray[data, client] >= 2.0.0, < 2.53.0 (non-Windows) or >= 2.10.0, < 2.53.0 (Windows)

Credentials

  • No additional credentials required for local Ray clusters.
  • Remote Ray clusters may require cluster-specific authentication (e.g., Ray client address, TLS certificates).
  • Cloud storage credentials are handled separately by the Cloud Storage Credentials environment.

Environment Variables

Variable Values Default Description
DAFT_RUNNER "ray" "native" Set to "ray" to activate the Ray distributed runner.
RAY_JOB_ID any string unset Presence of this variable is used as a heuristic to detect that Daft is running inside a Ray job.
DAFT_FLOTILLA_WORKER "1" unset Set automatically by Daft inside Ray worker actors. Used internally to disable progress bars and dashboard events on workers.
CUDA_VISIBLE_DEVICES Comma-separated GPU indices unset Set automatically by the Flotilla worker when num_gpus > 0. Controls which GPUs are visible to each worker actor.
DAFT_MIN_CPU_PER_TASK Float 0.5 Minimum CPU resources allocated per task in the Ray runner.
DAFT_ACTOR_UDF_READY_TIMEOUT Integer (seconds) 120 Timeout in seconds for actor UDFs to become ready.

Quick Install

# Install Daft with Ray support
pip install "daft[ray]"

# Or install Ray separately
pip install daft
pip install "ray[data,client]>=2.0.0,<2.53.0"

Activation

The Ray runner can be activated in two ways:

# Option 1: Environment variable (set before importing daft)
# export DAFT_RUNNER=ray

# Option 2: Programmatic API
import daft
daft.set_runner_ray()

# With custom configuration
daft.set_runner_ray(
    address="ray://cluster-address:10001",
    noop_if_initialized=True,
    max_task_backlog=100,
    force_client_mode=False,
)

Code Evidence

Ray dependency specification from pyproject.toml lines 52-56:

ray = [
  'ray[data, client]>=2.0.0,<2.53.0; platform_system != "Windows"',
  'ray[data, client]>=2.10.0,<2.53.0; platform_system == "Windows"'
]

Ray auto-detection from daft/utils.py lines 138-157:

def detect_ray_state() -> tuple[bool, bool]:
    ray_is_initialized = False
    ray_is_in_job = False
    in_ray_worker = False
    try:
        import ray

        if ray.is_initialized():
            ray_is_initialized = True
            if ray._private.worker.global_worker.mode == ray.WORKER_MODE:
                in_ray_worker = True
        elif os.getenv("RAY_JOB_ID") is not None:
            ray_is_in_job = True

    except ImportError:
        pass

    return ray_is_initialized or ray_is_in_job, in_ray_worker

Runner configuration API from daft/runners/__init__.py lines 51-76:

def set_runner_ray(
    address: str | None = None,
    noop_if_initialized: bool = False,
    max_task_backlog: int | None = None,
    force_client_mode: bool = False,
) -> Runner[PartitionT]:
    """Configure Daft to execute dataframes using the Ray distributed computing framework."""
    return _set_runner_ray(
        address=address,
        noop_if_initialized=noop_if_initialized,
        max_task_backlog=max_task_backlog,
        force_client_mode=force_client_mode,
    )

Flotilla worker actor initialization from daft/runners/flotilla.py lines 57-60:

def __init__(self, num_cpus: int, num_gpus: int) -> None:
    os.environ["DAFT_FLOTILLA_WORKER"] = "1"
    if num_gpus > 0:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(i) for i in range(num_gpus))

Execution config defaults from src/common/daft-config/src/lib.rs lines 168-178:

shuffle_aggregation_default_partitions: 200,
min_cpu_per_task: 0.5,
actor_udf_ready_timeout: 120,

Common Errors

Error Message Cause Solution
ImportError: No module named 'ray' Ray is not installed. Run pip install "daft[ray]" to install Daft with Ray support.
ConnectionError: Could not connect to Ray cluster The specified Ray cluster address is unreachable or the cluster is not running. Verify the cluster address and ensure the Ray head node is running. For local usage, omit the address parameter.
RuntimeError: Ray is not initialized Daft attempted to use the Ray runner but Ray was not started. Call ray.init() before daft.set_runner_ray() or let Daft auto-initialize a local Ray instance.
Actor UDF ready timeout exceeded An actor-based UDF did not initialize within the configured timeout (default 120s). Increase the timeout via DAFT_ACTOR_UDF_READY_TIMEOUT environment variable, or investigate slow actor initialization (e.g., large model loading).

Compatibility Notes

  • Ray >= 2.10.0 is required on Windows because earlier versions imposed a PyArrow upper pin that conflicted with Daft's requirements.
  • Ray < 2.53.0 is the upper bound to ensure compatibility with Daft's internal Ray APIs.
  • The shuffle_aggregation_default_partitions defaults to 200 partitions, which is appropriate for most workloads. For very large datasets, this value may need tuning.
  • The min_cpu_per_task defaults to 0.5, meaning each task requests half a CPU core. This allows multiple tasks to run concurrently on a single core for I/O-bound workloads.
  • When running inside a Ray worker (detected via ray.WORKER_MODE), Daft disables progress bars automatically to avoid cluttering worker logs.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment