Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Execution Engine Start

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Distributed_Systems, Dataflow_Processing
Last Updated 2026-02-08 12:00 GMT

Overview

Implements the core execution engine that transforms a logical dataflow graph into a running physical execution graph. Program::from_logical builds the physical graph of SubtaskNode instances connected by bounded channels, and Engine::start spawns async tasks for each operator and starts the network manager for inter-worker communication.

Description

The execution engine implementation spans two primary components in engine.rs:

Program::from_logical (lines 206-359) constructs the physical execution graph from a logical dataflow description:

  1. Optionally loads checkpoint metadata from the state backend if a restore epoch is provided
  2. Builds a parallelism map from the task assignments to determine how many subtasks each node should have
  3. For each logical node, creates parallelism instances of SubtaskNode by calling construct_node to instantiate the operator chain with proper task info, schemas, and state restoration context
  4. For each logical edge, creates physical edges based on the edge type:
    • Forward edges: one-to-one mapping from source subtask i to destination subtask i, with a single bounded channel per pair
    • Shuffle/LeftJoin/RightJoin edges: all-to-all mapping, creating a bounded channel from every source subtask to every destination subtask
  5. Each physical edge wraps a BatchSender/BatchReceiver pair with the edge's Arrow schema and logical indices
  6. Returns a Program containing the physical graph wrapped in an Arc<RwLock<...>>

Engine::start (lines 515-557) launches execution of the physical graph:

  1. Collects all node indices from the physical graph
  2. Creates a Barrier sized to the number of local tasks (tasks assigned to this worker)
  3. Creates a stop barrier for coordinated shutdown synchronization
  4. Iterates through all node indices and calls schedule_node for each, which:
    • For local tasks: extracts input/output channels, spawns a tokio task that calls operator.start() with the channels, and sets up a panic-catching wrapper that reports TaskFailed on the control channel
    • For remote tasks: connects local channel endpoints to the NetworkManager for cross-worker data transfer
  5. Starts the network manager to begin accepting and sending inter-worker data
  6. Clears all remaining sender references from graph edges to avoid dangling channels
  7. Returns a RunningEngine that provides access to source, sink, and operator control channels

construct_node (lines 784-874) builds a concrete OperatorNode from an OperatorChain:

  • For source chains: constructs a single SourceNode with an OperatorContext containing task info, state restoration context, and output schema
  • For non-source chains: builds a linked list of ChainedOperator instances, each with its own OperatorContext, allowing multiple operators to be fused into a single task

construct_operator (lines 876-912) maps OperatorName enum values to concrete constructor implementations using a match statement covering all built-in operators (ArrowValue, ArrowKey, Projection, AsyncUdf, windowing operators, join operators, watermark generators) and connector-based sources/sinks.

Supporting types include:

  • SubtaskNode -- holds node ID, subtask index, parallelism, input/output schemas, and the constructed OperatorNode
  • QueueNode -- replaces a SubtaskNode after extraction, retaining only the control channel sender and task info
  • PhysicalGraphEdge -- carries the channel pair, edge type, schema, and logical graph indices
  • RunningEngine -- provides methods to access control channels organized by topology (sources, sinks, all operators)

Usage

Program::from_logical is called by the worker when it receives a StartExecutionReq from the controller, passing the deserialized logical graph, task assignments, operator registry, and optional restore epoch. The resulting Program is then wrapped in an Engine along with the NetworkManager and started. The RunningEngine is used by the worker's main loop to forward control messages from the controller to the appropriate operator tasks.

Code Reference

Source Location

Signature

pub struct Program {
    pub name: String,
    pub graph: Arc<RwLock<DiGraph<SubtaskOrQueueNode, PhysicalGraphEdge>>>,
    pub control_tx: Option<Sender<ControlResp>>,
}

impl Program {
    pub async fn from_logical(
        name: String,
        job_id: &str,
        logical: &LogicalGraph,
        assignments: &Vec<TaskAssignment>,
        registry: Registry,
        restore_epoch: Option<u32>,
        control_tx: Sender<ControlResp>,
    ) -> Program
}

pub struct Engine {
    program: Program,
    worker_id: WorkerId,
    run_id: u64,
    job_id: String,
    network_manager: NetworkManager,
    assignments: HashMap<(u32, usize), TaskAssignment>,
}

impl Engine {
    pub fn new(
        program: Program,
        worker_id: WorkerId,
        job_id: String,
        run_id: u64,
        network_manager: NetworkManager,
        assignments: Vec<TaskAssignment>,
    ) -> Self

    pub async fn start(mut self) -> RunningEngine
}

pub struct RunningEngine {
    program: Program,
    assignments: HashMap<(u32, usize), TaskAssignment>,
    worker_id: WorkerId,
}

impl RunningEngine {
    pub fn source_controls(&self) -> Vec<Sender<ControlMessage>>
    pub fn sink_controls(&self) -> Vec<Sender<ControlMessage>>
    pub fn operator_controls(&self) -> HashMap<u32, Vec<Sender<ControlMessage>>>
    pub fn operator_to_node(&self) -> HashMap<String, u32>
}

Import

use arroyo_datastream::logical::{
    LogicalEdge, LogicalEdgeType, LogicalGraph, LogicalNode, OperatorChain, OperatorName,
};
use arroyo_operator::operator::{Registry, ChainedOperator, ConstructedOperator, OperatorNode, SourceNode};
use arroyo_operator::context::{BatchReceiver, BatchSender, OperatorContext, batch_bounded};
use arroyo_rpc::grpc::rpc::TaskAssignment;
use arroyo_rpc::{ControlMessage, ControlResp};
use arroyo_state::{BackingStore, StateBackend};
use arroyo_types::{TaskInfo, WorkerId};
use petgraph::graph::DiGraph;

I/O Contract

Program::from_logical

Inputs

Name Type Required Description
name String Yes Name for the program instance (e.g., worker identifier)
job_id &str Yes Unique job identifier used for state backend operations
logical &LogicalGraph Yes The logical dataflow graph (DiGraph<LogicalNode, LogicalEdge>) with operator nodes and typed edges
assignments &Vec<TaskAssignment> Yes Task-to-worker mapping from the controller, determining parallelism and placement
registry Registry Yes Operator and UDF registry providing constructors for all operator types
restore_epoch Option<u32> No If present, the checkpoint epoch from which to restore operator state
control_tx Sender<ControlResp> Yes Channel sender for operators to report control responses (task started, task failed, checkpoint completed) back to the worker

Outputs

Name Type Description
(return) Program Program containing graph: Arc<RwLock<DiGraph<SubtaskOrQueueNode, PhysicalGraphEdge>>> with concrete operator instances and bounded channels connecting them

Engine::start

Inputs

Name Type Required Description
self Engine Yes Constructed engine with physical graph, worker ID, network manager, and task assignments

Outputs

Name Type Description
(return) RunningEngine Handle providing access to source, sink, and operator control channels for the running pipeline
Side effect tokio tasks One tokio task spawned per local subtask, running the operator's processing loop
Side effect network connections Network manager started with sender connections to remote workers

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment