Implementation:ArroyoSystems Arroyo Job Metrics
Overview
JobMetrics is a metrics collection and aggregation system within the Arroyo controller that tracks per-operator, per-subtask runtime metrics from worker nodes. It computes exponentially-weighted moving averages (EWMA) for rate-based metrics and maintains backpressure measurements in fixed-size circular buffers.
Description
The JobMetrics struct holds a reference to the logical program graph and a concurrent map of per-task metric accumulators. Each task is identified by a TaskKey containing a node ID and subtask index. The system tracks four rate metrics (bytes received, bytes sent, messages received, messages sent) using the RateMetric EWMA calculator, plus a backpressure gauge derived from transmit queue utilization.
RateMetric computes smoothed per-second rates using an EWMA with alpha=0.1, storing results in a CircularBuffer with capacity for 5 minutes of data at 2-second collection intervals. The CircularBuffer is a generic const-generic ring buffer that supports iteration, push, and last-element access.
Usage
JobMetrics is instantiated when a job starts running and is updated periodically as the controller receives heartbeat data from workers. The get_groups method returns aggregated metrics organized by operator for the API layer.
Code Reference
Source Location
crates/arroyo-controller/src/job_controller/job_metrics.rs
Signature
pub struct JobMetrics {
program: Arc<LogicalProgram>,
tasks: Arc<RwLock<HashMap<TaskKey, TaskMetrics>>>,
}
impl JobMetrics {
pub fn new(program: Arc<LogicalProgram>) -> Self
pub async fn update(&self, node_id: u32, subtask_idx: u32, values: &HashMap<MetricName, u64>)
pub async fn get_groups(&self) -> Vec<OperatorMetricGroup>
}
pub struct RateMetric {
values: CircularBuffer<(SystemTime, f64), NUM_BUCKETS>,
prev_value: Option<(SystemTime, u64)>,
}
impl RateMetric {
pub fn new() -> Self
pub fn add(&mut self, time: SystemTime, value: u64)
pub fn is_empty(&self) -> bool
pub fn iter(&self) -> impl Iterator<Item = (SystemTime, f64)> + '_
}
pub struct CircularBuffer<T: Copy, const N: usize> {
values: [T; N],
next_idx: usize,
size: u64,
}
Import
use crate::job_controller::job_metrics::JobMetrics;
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| program | Arc<LogicalProgram> |
The logical program graph defining operators and parallelism |
| node_id | u32 |
The operator node index for metric updates |
| subtask_idx | u32 |
The subtask index within the operator |
| values | HashMap<MetricName, u64> |
Raw counter values from a worker heartbeat |
Outputs
| Name | Type | Description |
|---|---|---|
| metric_groups | Vec<OperatorMetricGroup> |
Per-operator metric groups with subtask-level rate and backpressure data |
Usage Examples
// Creating a new JobMetrics instance for a running pipeline
let metrics = JobMetrics::new(program.clone());
// Updating metrics from a worker heartbeat
let mut values = HashMap::new();
values.insert(MetricName::BytesRecv, 1024);
values.insert(MetricName::MessagesSent, 50);
metrics.update(0, 1, &values).await;
// Retrieving aggregated metrics for the API
let groups = metrics.get_groups().await;
Related Pages
- ArroyoSystems_Arroyo_Controller_Server - The controller server that owns JobMetrics instances
- ArroyoSystems_Arroyo_Scheduler_Trait - Scheduler trait used to manage workers that report metrics