Principle:ArroyoSystems Arroyo Execution Engine
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Distributed_Systems, Dataflow_Processing |
| Last Updated | 2026-02-08 12:00 GMT |
Overview
Materializing a logical dataflow graph into physical operator instances and executing them. The execution engine transforms logical operator descriptions into concrete operator instances connected by typed message channels, then spawns asynchronous tasks for each operator to process streaming data.
Description
The execution engine in Arroyo bridges the gap between the logical plan (a directed graph of operator descriptions) and the physical runtime (a set of concurrent async tasks exchanging data through channels). This transformation involves two major phases:
Program construction (Program::from_logical): The logical dataflow graph is expanded into a physical graph where each logical node is replicated according to its parallelism level. For a node with parallelism p, the engine creates p SubtaskNode instances, each containing a fully constructed operator with its own state backend context, task info, and schema configuration. Edges between logical nodes are expanded according to their type:
- Forward edges create one-to-one connections between matching subtask indices
- Shuffle, LeftJoin, and RightJoin edges create all-to-all connections, enabling data repartitioning
Each edge is backed by a bounded async channel (BatchSender/BatchReceiver) that carries Arrow record batches between operators.
Engine execution (Engine::start): The engine iterates through all nodes in the physical graph, taking each SubtaskNode and scheduling it for execution. For nodes assigned to the local worker, the engine spawns a tokio task that runs the operator's processing loop. For nodes assigned to remote workers, the engine connects the local channel endpoints to the network manager for cross-worker communication. A Barrier synchronization primitive ensures all local tasks are ready before data begins flowing. After all nodes are scheduled, the network manager is started to handle inter-worker message passing.
Usage
The execution engine is invoked on each worker process after the controller sends a StartExecutionReq. The worker constructs a Program from the received logical graph and task assignments, creates an Engine with the program and network manager, and calls start() to begin execution. The resulting RunningEngine provides access to source and sink control channels for forwarding checkpoint barriers, watermarks, and stop signals from the controller.
Theoretical Basis
Dataflow execution engines convert logical plans into physical execution graphs through a series of well-defined transformations:
Operator construction: Each logical operator description (an OperatorChain containing one or more operator configurations) is instantiated into a concrete operator. The engine uses a registry pattern to map operator names to constructor functions:
for each logical_node in logical_graph:
for subtask_idx in 0..parallelism:
operator = construct_operator(node.operator_chain, registry)
physical_graph.add_node(SubtaskNode {
node_id, subtask_idx, parallelism, operator
})
Source operators (e.g., Kafka consumers, file readers) are wrapped as SourceNode instances, while processing operators are chained into ChainedOperator linked lists that execute sequentially within a single task, reducing channel overhead for fusible operators.
Channel-based message passing: Operators communicate through typed bounded channels. The channel type and topology depend on the edge type:
Forward edge: node_A[i] -> node_B[i] (1:1 mapping)
Shuffle edge: node_A[i] -> node_B[0..p] (1:all mapping, hash partitioning)
Join edges: node_A[i] -> node_B[0..p] (1:all, left/right distinction)
Bounded channels provide natural backpressure: when a downstream operator cannot keep up, the channel fills and the upstream operator blocks, propagating flow control through the graph.
Task spawning: Each operator subtask runs as an independent tokio async task. The task's main loop receives batches from input channels, processes them through the operator chain, and sends results to output channels. A barrier synchronization ensures all tasks on a worker are initialized before any begins processing data.
Control plane integration: Beyond data flow, each task has a control channel (Receiver<ControlMessage>) through which the controller can send:
- Checkpoint barriers -- triggering state snapshot and propagation
- Watermark advances -- tracking event-time progress
- Stop signals -- initiating graceful shutdown
- Commit messages -- confirming two-phase commit completion
The RunningEngine exposes control channel handles organized by node ID, allowing the controller to target specific operator groups or only source/sink operators as needed.