Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Eventual Inc Daft Distributed UDF Processing

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, ML_Ops, Distributed_Computing
Last Updated 2026-02-08 14:00 GMT

Overview

End-to-end process for defining custom Python processing logic as User-Defined Functions (UDFs), applying them to large datasets, and scaling execution across distributed compute resources with optional GPU acceleration.

Description

This workflow covers the pattern of extending Daft's built-in operations with custom Python code packaged as UDFs. Daft provides three UDF interfaces: row-wise functions (@daft.func) that process individual rows, batch functions (@daft.func.batch) that process entire Series/columns for efficiency, and stateful class-based UDFs (@daft.cls) that maintain state across batches (ideal for ML model loading). UDFs declare their resource requirements (CPU, memory, GPU), return types, and concurrency settings. Daft automatically handles batching, serialization, parallelization, and scheduling across available compute resources. When combined with the Ray runner, UDFs execute across distributed clusters with automatic GPU assignment.

Usage

Execute this workflow when Daft's built-in expression functions are insufficient for your processing needs. Typical triggers include:

  • Running custom ML model inference (PyTorch, TensorFlow, ONNX) on tabular or multimodal data
  • Applying domain-specific business logic that cannot be expressed as standard DataFrame operations
  • Processing data with external libraries (NLP toolkits, computer vision, audio processing)
  • Building data enrichment pipelines that call external APIs or services
  • GPU-accelerated transformations (embedding generation, image processing, video analysis)

Execution Steps

Step 1: Define UDF Functions

Create UDF functions using Daft's decorator-based API. Choose the appropriate UDF type based on your processing pattern: @daft.func for row-wise scalar operations, @daft.func.batch for vectorized operations on entire columns, or @daft.cls for stateful processing that requires one-time initialization (model loading, connection pooling).

Key considerations:

  • Use @daft.func for simple per-row transformations with Python type annotations for return type
  • Use @daft.func.batch with return_dtype parameter for vectorized operations receiving Series objects
  • Use @daft.cls for stateful UDFs where __init__ loads models or establishes connections
  • Specify num_gpus for GPU-accelerated UDFs (Daft assigns CUDA_VISIBLE_DEVICES automatically)
  • Set batch_size to control how many rows each UDF invocation processes
  • Set concurrency to control parallelism (number of concurrent UDF instances)

Step 2: Configure Resource Requirements

Declare the compute resources each UDF needs. Daft uses these declarations to schedule work across available resources, ensuring GPU UDFs land on GPU-equipped nodes and memory-intensive UDFs get adequate allocation.

Key considerations:

  • Use num_cpus parameter for CPU-bound processing
  • Use num_gpus parameter for GPU workloads (supports fractional GPUs like 0.5)
  • Use memory_bytes parameter for memory-intensive operations
  • On Ray, resource requests map to Ray resource scheduling
  • GPU UDFs automatically receive isolated CUDA_VISIBLE_DEVICES assignments

Step 3: Build the DataFrame Pipeline

Construct the data processing pipeline by reading source data and chaining DataFrame operations with UDF applications. UDFs are applied via with_column() or select() just like built-in expressions. All operations remain lazy until materialization.

Key considerations:

  • Use with_column("output", my_udf(col("input"))) to apply UDFs
  • Chain multiple UDF applications in sequence for multi-stage pipelines
  • Apply filter (where) before UDFs to reduce the volume of data processed
  • Use select() to project only needed columns before expensive UDF operations
  • Generator UDFs (yield multiple rows per input) work with the same with_column pattern
  • UDFs that return complex types use Pydantic models or explicit DataType specifications

Step 4: Enable Distributed Execution

For datasets or workloads that exceed single-machine capacity, switch to the Ray distributed runner. The same pipeline code executes without modification across a Ray cluster.

Key considerations:

  • Call daft.set_runner_ray() before building the pipeline
  • Ensure Ray cluster has the required resources (GPUs, memory)
  • Class-based UDFs run as Ray actor pools, maintaining state across batches
  • Daft handles data partitioning and work distribution automatically
  • Use Ray-specific options (label selectors, scheduling strategies) via the UDF decorator

Step 5: Execute and Collect Results

Trigger pipeline execution by calling a materialization action. Daft optimizes the query plan, parallelizes execution, and manages backpressure between pipeline stages.

Key considerations:

  • Use collect() to materialize the entire result into memory
  • Use write_parquet() or write_csv() to stream results directly to storage
  • Use show() for interactive development to verify a small sample
  • Use iter_rows() or iter_partitions() for streaming consumption
  • Monitor UDF metrics (custom counters) via the metrics API for observability

Step 6: Monitor and Debug

Use Daft's observability tools to understand pipeline performance. Review query plans with explain(), monitor custom UDF metrics, and use the dashboard for runtime visibility.

Key considerations:

  • Use explain() to see the optimized physical plan before execution
  • Use @daft.metrics to emit custom counters from UDFs (tokens processed, API calls, etc.)
  • On Ray, use the Ray dashboard to monitor task execution and resource utilization
  • Use Daft's OpenTelemetry integration for distributed tracing

Execution Diagram

GitHub URL

Workflow Repository