Implementation:ArroyoSystems Arroyo Execution Engine Start
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Distributed_Systems, Dataflow_Processing |
| Last Updated | 2026-02-08 12:00 GMT |
Overview
Implements the core execution engine that transforms a logical dataflow graph into a running physical execution graph. Program::from_logical builds the physical graph of SubtaskNode instances connected by bounded channels, and Engine::start spawns async tasks for each operator and starts the network manager for inter-worker communication.
Description
The execution engine implementation spans two primary components in engine.rs:
Program::from_logical (lines 206-359) constructs the physical execution graph from a logical dataflow description:
- Optionally loads checkpoint metadata from the state backend if a restore epoch is provided
- Builds a parallelism map from the task assignments to determine how many subtasks each node should have
- For each logical node, creates parallelism instances of
SubtaskNodeby callingconstruct_nodeto instantiate the operator chain with proper task info, schemas, and state restoration context - For each logical edge, creates physical edges based on the edge type:
- Forward edges: one-to-one mapping from source subtask i to destination subtask i, with a single bounded channel per pair
- Shuffle/LeftJoin/RightJoin edges: all-to-all mapping, creating a bounded channel from every source subtask to every destination subtask
- Each physical edge wraps a
BatchSender/BatchReceiverpair with the edge's Arrow schema and logical indices - Returns a
Programcontaining the physical graph wrapped in anArc<RwLock<...>>
Engine::start (lines 515-557) launches execution of the physical graph:
- Collects all node indices from the physical graph
- Creates a
Barriersized to the number of local tasks (tasks assigned to this worker) - Creates a stop barrier for coordinated shutdown synchronization
- Iterates through all node indices and calls
schedule_nodefor each, which:- For local tasks: extracts input/output channels, spawns a tokio task that calls
operator.start()with the channels, and sets up a panic-catching wrapper that reportsTaskFailedon the control channel - For remote tasks: connects local channel endpoints to the
NetworkManagerfor cross-worker data transfer
- For local tasks: extracts input/output channels, spawns a tokio task that calls
- Starts the network manager to begin accepting and sending inter-worker data
- Clears all remaining sender references from graph edges to avoid dangling channels
- Returns a
RunningEnginethat provides access to source, sink, and operator control channels
construct_node (lines 784-874) builds a concrete OperatorNode from an OperatorChain:
- For source chains: constructs a single
SourceNodewith anOperatorContextcontaining task info, state restoration context, and output schema - For non-source chains: builds a linked list of
ChainedOperatorinstances, each with its ownOperatorContext, allowing multiple operators to be fused into a single task
construct_operator (lines 876-912) maps OperatorName enum values to concrete constructor implementations using a match statement covering all built-in operators (ArrowValue, ArrowKey, Projection, AsyncUdf, windowing operators, join operators, watermark generators) and connector-based sources/sinks.
Supporting types include:
SubtaskNode-- holds node ID, subtask index, parallelism, input/output schemas, and the constructedOperatorNodeQueueNode-- replaces aSubtaskNodeafter extraction, retaining only the control channel sender and task infoPhysicalGraphEdge-- carries the channel pair, edge type, schema, and logical graph indicesRunningEngine-- provides methods to access control channels organized by topology (sources, sinks, all operators)
Usage
Program::from_logical is called by the worker when it receives a StartExecutionReq from the controller, passing the deserialized logical graph, task assignments, operator registry, and optional restore epoch. The resulting Program is then wrapped in an Engine along with the NetworkManager and started. The RunningEngine is used by the worker's main loop to forward control messages from the controller to the appropriate operator tasks.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File:
crates/arroyo-worker/src/engine.rs(lines 206-557)
Signature
pub struct Program {
pub name: String,
pub graph: Arc<RwLock<DiGraph<SubtaskOrQueueNode, PhysicalGraphEdge>>>,
pub control_tx: Option<Sender<ControlResp>>,
}
impl Program {
pub async fn from_logical(
name: String,
job_id: &str,
logical: &LogicalGraph,
assignments: &Vec<TaskAssignment>,
registry: Registry,
restore_epoch: Option<u32>,
control_tx: Sender<ControlResp>,
) -> Program
}
pub struct Engine {
program: Program,
worker_id: WorkerId,
run_id: u64,
job_id: String,
network_manager: NetworkManager,
assignments: HashMap<(u32, usize), TaskAssignment>,
}
impl Engine {
pub fn new(
program: Program,
worker_id: WorkerId,
job_id: String,
run_id: u64,
network_manager: NetworkManager,
assignments: Vec<TaskAssignment>,
) -> Self
pub async fn start(mut self) -> RunningEngine
}
pub struct RunningEngine {
program: Program,
assignments: HashMap<(u32, usize), TaskAssignment>,
worker_id: WorkerId,
}
impl RunningEngine {
pub fn source_controls(&self) -> Vec<Sender<ControlMessage>>
pub fn sink_controls(&self) -> Vec<Sender<ControlMessage>>
pub fn operator_controls(&self) -> HashMap<u32, Vec<Sender<ControlMessage>>>
pub fn operator_to_node(&self) -> HashMap<String, u32>
}
Import
use arroyo_datastream::logical::{
LogicalEdge, LogicalEdgeType, LogicalGraph, LogicalNode, OperatorChain, OperatorName,
};
use arroyo_operator::operator::{Registry, ChainedOperator, ConstructedOperator, OperatorNode, SourceNode};
use arroyo_operator::context::{BatchReceiver, BatchSender, OperatorContext, batch_bounded};
use arroyo_rpc::grpc::rpc::TaskAssignment;
use arroyo_rpc::{ControlMessage, ControlResp};
use arroyo_state::{BackingStore, StateBackend};
use arroyo_types::{TaskInfo, WorkerId};
use petgraph::graph::DiGraph;
I/O Contract
Program::from_logical
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | String |
Yes | Name for the program instance (e.g., worker identifier) |
| job_id | &str |
Yes | Unique job identifier used for state backend operations |
| logical | &LogicalGraph |
Yes | The logical dataflow graph (DiGraph<LogicalNode, LogicalEdge>) with operator nodes and typed edges
|
| assignments | &Vec<TaskAssignment> |
Yes | Task-to-worker mapping from the controller, determining parallelism and placement |
| registry | Registry |
Yes | Operator and UDF registry providing constructors for all operator types |
| restore_epoch | Option<u32> |
No | If present, the checkpoint epoch from which to restore operator state |
| control_tx | Sender<ControlResp> |
Yes | Channel sender for operators to report control responses (task started, task failed, checkpoint completed) back to the worker |
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | Program |
Program containing graph: Arc<RwLock<DiGraph<SubtaskOrQueueNode, PhysicalGraphEdge>>> with concrete operator instances and bounded channels connecting them
|
Engine::start
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| self | Engine |
Yes | Constructed engine with physical graph, worker ID, network manager, and task assignments |
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | RunningEngine |
Handle providing access to source, sink, and operator control channels for the running pipeline |
| Side effect | tokio tasks | One tokio task spawned per local subtask, running the operator's processing loop |
| Side effect | network connections | Network manager started with sender connections to remote workers |