Implementation:ArroyoSystems Arroyo Metrics Endpoint
| Field | Value |
|---|---|
| Sources | ArroyoSystems/arroyo |
| Domains | Stream_Processing, Observability |
| Last Updated | 2026-02-08 |
Overview
Metrics_Endpoint implements the REST API endpoint for retrieving operator-level metric groups for a running job, providing throughput and backpressure data to the Arroyo web UI's pipeline monitoring dashboard.
Description
The crates/arroyo-api/src/metrics.rs module contains a single async handler, get_operator_metric_groups, that serves the GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups endpoint.
The handler follows this sequence:
- Authentication: Validates the bearer token and retrieves the
AuthDatacontext. - Job validation: Calls
query_job_by_pub_idto verify that the pipeline and job exist and belong to the authenticated organization. This also retrieves the internal job record (with the databaseidfield needed for the gRPC call). - gRPC connection: Establishes a connection to the controller service using
arroyo_rpc::connect_grpcwith the controller endpoint from configuration. The connection uses zstd compression for both sending and receiving viaCompressionEncoding::Zstd. - Metrics retrieval: Sends a
JobMetricsReqwith the internal job ID to the controller'sjob_metricsgRPC method. - Response handling:
- On success, deserializes the JSON metrics string from the gRPC response into a
Vec<OperatorMetricGroup>. - If the controller returns
Code::NotFound(meaning the job is not currently running), an empty metric collection is returned rather than an error. - Other gRPC errors are mapped to HTTP 500 errors via
log_and_map.
- On success, deserializes the JSON metrics string from the gRPC response into a
The controller service connection error is mapped to a 503 Service Unavailable response using service_unavailable("controller-service").
Usage
This endpoint is polled periodically by the Arroyo web UI to display real-time per-operator metrics (messages sent/received, bytes sent/received, backpressure) on the pipeline detail dashboard. It is only meaningful while a job is actively running.
Code Reference
Source Location
- Repository
ArroyoSystems/arroyo(GitHub)- File
crates/arroyo-api/src/metrics.rs- Lines
- L1--L80
Signature
/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups
/// Retrieves operator-level metric groups for a running job.
pub async fn get_operator_metric_groups(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
) -> Result<Json<OperatorMetricGroupCollection>, ErrorResp>
Import
// Crate-internal: used within arroyo-api
use crate::metrics::get_operator_metric_groups;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| Bearer token | Header | Yes | Authentication credential |
pipeline_id |
Path (String) | Yes | Public ID of the pipeline |
job_id |
Path (String) | Yes | Public ID of the job |
Outputs
| Condition | Type | Description |
|---|---|---|
| Success | Json<OperatorMetricGroupCollection> |
Collection of per-operator metric groups containing subtask-level metrics |
| Job not running | Json<OperatorMetricGroupCollection> |
Empty collection (data: []) when controller returns NOT_FOUND
|
| Auth failure | ErrorResp |
HTTP 401 for invalid or missing credentials |
| Job not found | ErrorResp |
HTTP 404 if the pipeline or job does not exist |
| Controller unavailable | ErrorResp |
HTTP 503 if unable to connect to the controller gRPC service |
| gRPC error | ErrorResp |
HTTP 500 for unexpected controller errors |
Each OperatorMetricGroup contains:
operator_id-- Identifier of the operator in the dataflow graph- Subtask-level metrics including messages and bytes sent/received, and backpressure measurements
Usage Examples
Fetching Job Metrics via REST
# Get operator metrics for a running job
curl -H "Authorization: Bearer $TOKEN" \
http://localhost:8000/api/v1/pipelines/pl_abc123/jobs/jc_def456/operator_metric_groups
Example Response
{
"data": [
{
"operator_id": "node_1",
"subtasks": [
{
"index": 0,
"metrics": [
{ "name": "messages_sent", "value": 15420.0 },
{ "name": "bytes_sent", "value": 1048576.0 }
]
}
]
}
]
}