Environment:Eventual Inc Daft Ray Distributed Runner
| Knowledge Sources | |
|---|---|
| Domains | Distributed Computing, Ray, Cluster Execution |
| Last Updated | 2026-02-08 15:30 GMT |
Overview
The Ray_Distributed_Runner environment provides optional distributed execution capabilities for Daft by integrating with the Ray framework, enabling dataframe operations to scale across multi-node clusters.
Description
Daft supports two execution runners: the default native runner (local multithreaded) and the optional Ray runner (distributed). When using the Ray runner, Daft leverages Ray's task scheduling, actor model, and distributed object store to parallelize dataframe operations across a cluster of machines.
The Ray integration includes:
- Automatic Ray detection -- Daft checks whether Ray is already initialized via
ray.is_initialized()and whether theRAY_JOB_IDenvironment variable is set, allowing seamless integration within existing Ray jobs. - Flotilla worker actors -- Daft spawns Ray actors (called "Swordfish" actors via the Flotilla system) that execute query plans on worker nodes. These actors set
DAFT_FLOTILLA_WORKER=1and configureCUDA_VISIBLE_DEVICESwhen GPUs are allocated. - Configurable execution parameters -- including minimum CPU allocation per task, actor UDF ready timeout, task backlog limits, and default shuffle partition counts.
The Ray extras install both ray[data] (for dataset integration) and ray[client] (for remote cluster connectivity).
Usage
Use the Ray distributed runner when:
- Your data exceeds single-machine memory capacity
- You need to distribute computation across a multi-node cluster
- You are already running within a Ray cluster or Ray job
- You need GPU-accelerated UDFs across multiple GPU nodes
System Requirements
| Category | Requirement | Notes |
|---|---|---|
| Python | >= 3.10 | Inherited from the core environment |
| Ray (non-Windows) | >= 2.0.0, < 2.53.0 | From pyproject.toml line 54
|
| Ray (Windows) | >= 2.10.0, < 2.53.0 | Ray 2.10+ removed the PyArrow upper pin on Windows (pyproject.toml line 55)
|
| Operating System | Linux, macOS, Windows | Full support on Linux/macOS; Windows requires Ray >= 2.10.0 |
| Network | Cluster connectivity | Required for multi-node Ray clusters; local mode works without network |
Dependencies
System Packages
- All core environment system packages (Rust toolchain for source builds, etc.)
- Ray cluster infrastructure -- either a local Ray instance or a remote Ray cluster
Python Packages
- All core environment Python packages (pyarrow, fsspec, tqdm, packaging)
- ray[data, client] >= 2.0.0, < 2.53.0 (non-Windows) or >= 2.10.0, < 2.53.0 (Windows)
Credentials
- No additional credentials required for local Ray clusters.
- Remote Ray clusters may require cluster-specific authentication (e.g., Ray client address, TLS certificates).
- Cloud storage credentials are handled separately by the Cloud Storage Credentials environment.
Environment Variables
| Variable | Values | Default | Description |
|---|---|---|---|
DAFT_RUNNER |
"ray" |
"native" |
Set to "ray" to activate the Ray distributed runner.
|
RAY_JOB_ID |
any string | unset | Presence of this variable is used as a heuristic to detect that Daft is running inside a Ray job. |
DAFT_FLOTILLA_WORKER |
"1" |
unset | Set automatically by Daft inside Ray worker actors. Used internally to disable progress bars and dashboard events on workers. |
CUDA_VISIBLE_DEVICES |
Comma-separated GPU indices | unset | Set automatically by the Flotilla worker when num_gpus > 0. Controls which GPUs are visible to each worker actor.
|
DAFT_MIN_CPU_PER_TASK |
Float | 0.5 |
Minimum CPU resources allocated per task in the Ray runner. |
DAFT_ACTOR_UDF_READY_TIMEOUT |
Integer (seconds) | 120 |
Timeout in seconds for actor UDFs to become ready. |
Quick Install
# Install Daft with Ray support
pip install "daft[ray]"
# Or install Ray separately
pip install daft
pip install "ray[data,client]>=2.0.0,<2.53.0"
Activation
The Ray runner can be activated in two ways:
# Option 1: Environment variable (set before importing daft)
# export DAFT_RUNNER=ray
# Option 2: Programmatic API
import daft
daft.set_runner_ray()
# With custom configuration
daft.set_runner_ray(
address="ray://cluster-address:10001",
noop_if_initialized=True,
max_task_backlog=100,
force_client_mode=False,
)
Code Evidence
Ray dependency specification from pyproject.toml lines 52-56:
ray = [
'ray[data, client]>=2.0.0,<2.53.0; platform_system != "Windows"',
'ray[data, client]>=2.10.0,<2.53.0; platform_system == "Windows"'
]
Ray auto-detection from daft/utils.py lines 138-157:
def detect_ray_state() -> tuple[bool, bool]:
ray_is_initialized = False
ray_is_in_job = False
in_ray_worker = False
try:
import ray
if ray.is_initialized():
ray_is_initialized = True
if ray._private.worker.global_worker.mode == ray.WORKER_MODE:
in_ray_worker = True
elif os.getenv("RAY_JOB_ID") is not None:
ray_is_in_job = True
except ImportError:
pass
return ray_is_initialized or ray_is_in_job, in_ray_worker
Runner configuration API from daft/runners/__init__.py lines 51-76:
def set_runner_ray(
address: str | None = None,
noop_if_initialized: bool = False,
max_task_backlog: int | None = None,
force_client_mode: bool = False,
) -> Runner[PartitionT]:
"""Configure Daft to execute dataframes using the Ray distributed computing framework."""
return _set_runner_ray(
address=address,
noop_if_initialized=noop_if_initialized,
max_task_backlog=max_task_backlog,
force_client_mode=force_client_mode,
)
Flotilla worker actor initialization from daft/runners/flotilla.py lines 57-60:
def __init__(self, num_cpus: int, num_gpus: int) -> None:
os.environ["DAFT_FLOTILLA_WORKER"] = "1"
if num_gpus > 0:
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(i) for i in range(num_gpus))
Execution config defaults from src/common/daft-config/src/lib.rs lines 168-178:
shuffle_aggregation_default_partitions: 200,
min_cpu_per_task: 0.5,
actor_udf_ready_timeout: 120,
Common Errors
| Error Message | Cause | Solution |
|---|---|---|
ImportError: No module named 'ray' |
Ray is not installed. | Run pip install "daft[ray]" to install Daft with Ray support.
|
ConnectionError: Could not connect to Ray cluster |
The specified Ray cluster address is unreachable or the cluster is not running. | Verify the cluster address and ensure the Ray head node is running. For local usage, omit the address parameter.
|
RuntimeError: Ray is not initialized |
Daft attempted to use the Ray runner but Ray was not started. | Call ray.init() before daft.set_runner_ray() or let Daft auto-initialize a local Ray instance.
|
Actor UDF ready timeout exceeded |
An actor-based UDF did not initialize within the configured timeout (default 120s). | Increase the timeout via DAFT_ACTOR_UDF_READY_TIMEOUT environment variable, or investigate slow actor initialization (e.g., large model loading).
|
Compatibility Notes
- Ray >= 2.10.0 is required on Windows because earlier versions imposed a PyArrow upper pin that conflicted with Daft's requirements.
- Ray < 2.53.0 is the upper bound to ensure compatibility with Daft's internal Ray APIs.
- The shuffle_aggregation_default_partitions defaults to 200 partitions, which is appropriate for most workloads. For very large datasets, this value may need tuning.
- The min_cpu_per_task defaults to 0.5, meaning each task requests half a CPU core. This allows multiple tasks to run concurrently on a single core for I/O-bound workloads.
- When running inside a Ray worker (detected via
ray.WORKER_MODE), Daft disables progress bars automatically to avoid cluttering worker logs.