Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:ArroyoSystems Arroyo Worker Scheduling

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Distributed_Systems, Scheduling
Last Updated 2026-02-08 12:00 GMT

Overview

Scheduling and distributing dataflow tasks across worker processes in a distributed stream processing engine. The scheduler allocates workers, assigns subtasks via round-robin distribution, establishes gRPC-based communication channels, and performs checkpoint-aware scheduling for pipeline recovery.

Description

In Arroyo's distributed stream processing architecture, each streaming pipeline is decomposed into a set of logical operators, each of which may run at a configurable parallelism level. The scheduling subsystem is responsible for mapping these logical operator instances (subtasks) onto physical worker processes. This involves four core responsibilities:

Worker allocation: The scheduler determines how many worker processes are needed based on the maximum parallelism of any operator in the pipeline. It requests the underlying scheduler backend (embedded, Kubernetes, etc.) to start workers with enough task slots to accommodate the pipeline's requirements. The number of required slots is computed as the maximum parallelism across all nodes in the logical dataflow graph.

Task assignment: Once workers are running and have connected via gRPC, the controller computes a mapping from subtasks to workers using a slot-filling strategy. Subtasks are assigned to workers sequentially, filling each worker's available slots before moving to the next. This produces even distribution when operator parallelism values are similar across the graph.

gRPC-based worker communication: After workers start, they connect back to the controller and the controller establishes outbound gRPC connections to each worker. Task assignments (including node ID, subtask index, worker ID, and data address) are sent to all workers via a StartExecutionReq gRPC call, enabling each worker to know the full topology and establish inter-worker data channels.

Checkpoint-aware scheduling for recovery: When a pipeline restarts, the scheduler consults the state backend to find the last successful checkpoint. If a valid checkpoint exists (and passes any ignore_state_before_epoch threshold), the checkpoint epoch is included in the execution start request so that workers can restore operator state. The scheduler also handles committing state from checkpoints that were in the committing phase at the time of failure.

Usage

Worker scheduling is invoked each time a pipeline transitions into the Scheduling state. This occurs on initial pipeline start, after recovery from a failure, and after rescaling operations. The scheduling state is a transient phase: once all workers are started, connected, assigned tasks, and all tasks report as running, the pipeline transitions to the Running state.

Theoretical Basis

Task scheduling in distributed stream processing requires balancing several competing concerns:

Resource allocation: Each pipeline requires a number of task slots equal to the maximum operator parallelism. The scheduler must ensure enough physical worker processes are available to provide these slots. If insufficient resources exist, the scheduler retries with backoff until either resources become available or a configurable timeout expires.

Load balancing: Arroyo uses a slot-filling assignment strategy where subtasks are assigned to workers by iterating through nodes and distributing subtasks across workers proportional to their slot capacity:

for each node in dataflow_graph:
    worker_idx = 0
    current_count = 0
    for subtask_idx in 0..node.parallelism:
        assign subtask to workers[worker_idx]
        current_count += 1
        if current_count == workers[worker_idx].slots:
            worker_idx += 1
            current_count = 0

This produces even distribution when workers have equal slot counts. The approach is simpler than cost-based scheduling but avoids hotspots for homogeneous workloads.

Fault tolerance: If a worker fails during scheduling (e.g., cannot be reached via gRPC), the entire scheduling attempt can be retried. The scheduler cleans up any existing workers for the job before each scheduling attempt, ensuring a clean slate. Workers that fail to connect within the configured worker_startup_time timeout cause a retryable error.

State-aware scheduling: On restart, the scheduler loads checkpoint metadata and prepares the state backend for restoration. This involves:

  1. Querying the database for the last successful checkpoint
  2. Loading checkpoint metadata from the state backend
  3. Preparing the checkpoint for loading (cleaning up epochs after the restore point)
  4. Detecting if the checkpoint was in a committing phase and preparing commit state
  5. Passing the restore epoch to workers so they can reconstruct operator state

This approach follows the model described in the Naiad timely dataflow system, where checkpoints provide globally consistent snapshots that enable deterministic recovery to a known-good state.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment