Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Metrics Endpoint

From Leeroopedia


Field Value
Sources ArroyoSystems/arroyo
Domains Stream_Processing, Observability
Last Updated 2026-02-08

Overview

Metrics_Endpoint implements the REST API endpoint for retrieving operator-level metric groups for a running job, providing throughput and backpressure data to the Arroyo web UI's pipeline monitoring dashboard.

Description

The crates/arroyo-api/src/metrics.rs module contains a single async handler, get_operator_metric_groups, that serves the GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups endpoint.

The handler follows this sequence:

  1. Authentication: Validates the bearer token and retrieves the AuthData context.
  2. Job validation: Calls query_job_by_pub_id to verify that the pipeline and job exist and belong to the authenticated organization. This also retrieves the internal job record (with the database id field needed for the gRPC call).
  3. gRPC connection: Establishes a connection to the controller service using arroyo_rpc::connect_grpc with the controller endpoint from configuration. The connection uses zstd compression for both sending and receiving via CompressionEncoding::Zstd.
  4. Metrics retrieval: Sends a JobMetricsReq with the internal job ID to the controller's job_metrics gRPC method.
  5. Response handling:
    • On success, deserializes the JSON metrics string from the gRPC response into a Vec<OperatorMetricGroup>.
    • If the controller returns Code::NotFound (meaning the job is not currently running), an empty metric collection is returned rather than an error.
    • Other gRPC errors are mapped to HTTP 500 errors via log_and_map.

The controller service connection error is mapped to a 503 Service Unavailable response using service_unavailable("controller-service").

Usage

This endpoint is polled periodically by the Arroyo web UI to display real-time per-operator metrics (messages sent/received, bytes sent/received, backpressure) on the pipeline detail dashboard. It is only meaningful while a job is actively running.

Code Reference

Source Location

Repository
ArroyoSystems/arroyo (GitHub)
File
crates/arroyo-api/src/metrics.rs
Lines
L1--L80

Signature

/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups
/// Retrieves operator-level metric groups for a running job.
pub async fn get_operator_metric_groups(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
) -> Result<Json<OperatorMetricGroupCollection>, ErrorResp>

Import

// Crate-internal: used within arroyo-api
use crate::metrics::get_operator_metric_groups;

I/O Contract

Inputs

Name Type Required Description
Bearer token Header Yes Authentication credential
pipeline_id Path (String) Yes Public ID of the pipeline
job_id Path (String) Yes Public ID of the job

Outputs

Condition Type Description
Success Json<OperatorMetricGroupCollection> Collection of per-operator metric groups containing subtask-level metrics
Job not running Json<OperatorMetricGroupCollection> Empty collection (data: []) when controller returns NOT_FOUND
Auth failure ErrorResp HTTP 401 for invalid or missing credentials
Job not found ErrorResp HTTP 404 if the pipeline or job does not exist
Controller unavailable ErrorResp HTTP 503 if unable to connect to the controller gRPC service
gRPC error ErrorResp HTTP 500 for unexpected controller errors

Each OperatorMetricGroup contains:

  • operator_id -- Identifier of the operator in the dataflow graph
  • Subtask-level metrics including messages and bytes sent/received, and backpressure measurements

Usage Examples

Fetching Job Metrics via REST

# Get operator metrics for a running job
curl -H "Authorization: Bearer $TOKEN" \
  http://localhost:8000/api/v1/pipelines/pl_abc123/jobs/jc_def456/operator_metric_groups

Example Response

{
  "data": [
    {
      "operator_id": "node_1",
      "subtasks": [
        {
          "index": 0,
          "metrics": [
            { "name": "messages_sent", "value": 15420.0 },
            { "name": "bytes_sent", "value": 1048576.0 }
          ]
        }
      ]
    }
  ]
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment