Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Job Metrics

From Leeroopedia
Revision as of 14:27, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_Job_Metrics.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Overview

JobMetrics is a metrics collection and aggregation system within the Arroyo controller that tracks per-operator, per-subtask runtime metrics from worker nodes. It computes exponentially-weighted moving averages (EWMA) for rate-based metrics and maintains backpressure measurements in fixed-size circular buffers.

Description

The JobMetrics struct holds a reference to the logical program graph and a concurrent map of per-task metric accumulators. Each task is identified by a TaskKey containing a node ID and subtask index. The system tracks four rate metrics (bytes received, bytes sent, messages received, messages sent) using the RateMetric EWMA calculator, plus a backpressure gauge derived from transmit queue utilization.

RateMetric computes smoothed per-second rates using an EWMA with alpha=0.1, storing results in a CircularBuffer with capacity for 5 minutes of data at 2-second collection intervals. The CircularBuffer is a generic const-generic ring buffer that supports iteration, push, and last-element access.

Usage

JobMetrics is instantiated when a job starts running and is updated periodically as the controller receives heartbeat data from workers. The get_groups method returns aggregated metrics organized by operator for the API layer.

Code Reference

Source Location

crates/arroyo-controller/src/job_controller/job_metrics.rs

Signature

pub struct JobMetrics {
    program: Arc<LogicalProgram>,
    tasks: Arc<RwLock<HashMap<TaskKey, TaskMetrics>>>,
}

impl JobMetrics {
    pub fn new(program: Arc<LogicalProgram>) -> Self
    pub async fn update(&self, node_id: u32, subtask_idx: u32, values: &HashMap<MetricName, u64>)
    pub async fn get_groups(&self) -> Vec<OperatorMetricGroup>
}

pub struct RateMetric {
    values: CircularBuffer<(SystemTime, f64), NUM_BUCKETS>,
    prev_value: Option<(SystemTime, u64)>,
}

impl RateMetric {
    pub fn new() -> Self
    pub fn add(&mut self, time: SystemTime, value: u64)
    pub fn is_empty(&self) -> bool
    pub fn iter(&self) -> impl Iterator<Item = (SystemTime, f64)> + '_
}

pub struct CircularBuffer<T: Copy, const N: usize> {
    values: [T; N],
    next_idx: usize,
    size: u64,
}

Import

use crate::job_controller::job_metrics::JobMetrics;

I/O Contract

Inputs

Name Type Description
program Arc<LogicalProgram> The logical program graph defining operators and parallelism
node_id u32 The operator node index for metric updates
subtask_idx u32 The subtask index within the operator
values HashMap<MetricName, u64> Raw counter values from a worker heartbeat

Outputs

Name Type Description
metric_groups Vec<OperatorMetricGroup> Per-operator metric groups with subtask-level rate and backpressure data

Usage Examples

// Creating a new JobMetrics instance for a running pipeline
let metrics = JobMetrics::new(program.clone());

// Updating metrics from a worker heartbeat
let mut values = HashMap::new();
values.insert(MetricName::BytesRecv, 1024);
values.insert(MetricName::MessagesSent, 50);
metrics.update(0, 1, &values).await;

// Retrieving aggregated metrics for the API
let groups = metrics.get_groups().await;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment