Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Api Server

From Leeroopedia


Field Value
Sources ArroyoSystems/arroyo
Domains Stream_Processing, API_Infrastructure
Last Updated 2026-02-08

Overview

Api_Server is the library root of the arroyo-api crate, providing the HTTP/HTTPS server initialization, OpenAPI specification generation, authentication data structures, organization resource limits, and module declarations for the Arroyo REST API.

Description

The crates/arroyo-api/src/lib.rs file serves as the entry point for the arroyo-api crate, which is the primary interface for the Arroyo web UI and programmatic clients to create and manage streaming pipelines, jobs, connections, UDFs, and metrics.

Server Startup

The start_server function is the main entry point for launching the API server. It performs:

  1. Configuration loading: Reads the API bind address and HTTP port from arroyo_rpc::config::config().
  2. Router creation: Calls rest::create_rest_app(database) to build the Axum router with all API routes and static file serving.
  3. Compression: Applies zstd compression via tower_http::CompressionLayer, explicitly excluding text/event-stream content type (SSE streams are incompatible with compression).
  4. Authentication layer: If the configuration specifies ApiAuthMode::StaticApiKey, wraps the application in a ValidateRequestHeaderLayer::bearer middleware.
  5. TLS support: Optionally creates a TLS configuration for HTTPS serving using arroyo_server_common::tls::create_http_tls_config.
  6. Server binding: Binds to the configured address using either axum_server::bind_rustls (for HTTPS) or axum::serve (for plain HTTP), and spawns the server task via the ShutdownGuard.

Organization Metadata

The OrgMetadata struct defines per-organization resource limits with sensible defaults:

  • can_create_programs -- Whether the org can create programs (default: false)
  • max_nexmark_qps -- Maximum Nexmark benchmark QPS (default: 1000.0)
  • max_impulse_qps -- Maximum Impulse source QPS (default: 1000.0)
  • max_parallelism -- Maximum operator parallelism (default: 32)
  • max_operators -- Maximum operators per pipeline (default: 30)
  • max_running_jobs -- Maximum concurrent running jobs (default: 10)
  • kafka_qps -- Maximum Kafka messages per second (default: 10,000)

Authentication Data

The AuthData struct carries the authentication context through API handlers:

  • user_id -- The authenticated user identifier
  • organization_id -- The organization the user belongs to
  • role -- The user's role within the organization
  • org_metadata -- The organization's resource limits

Compiler Service Client

The compiler_service function establishes a gRPC connection to the compiler service, used for UDF compilation. It reads the compiler endpoint from configuration and returns a CompilerGrpcClient.

Error Types

The HttpError struct and HttpErrorCode enum provide standardized error responses with codes: Unauthorized, InvalidCredentials, and ServerError. The HttpError implements IntoResponse to map error codes to appropriate HTTP status codes.

OpenAPI Specification

The ApiDoc struct uses utoipa::OpenApi to generate the full OpenAPI 3.0 specification. It declares all 27 REST endpoint paths and over 60 schema components covering pipelines, jobs, checkpoints, connections, connectors, UDFs, metrics, and formats.

Module Structure

The crate declares the following submodules:

  • cloud -- Cloud-specific functionality
  • connection_profiles -- Connection profile CRUD endpoints
  • connection_tables -- Connection table CRUD endpoints
  • connectors -- Connector enumeration endpoint
  • jobs -- Job management endpoints
  • metrics -- Job metrics endpoint
  • pipelines -- Pipeline CRUD endpoints
  • rest (public) -- Axum router and application state
  • rest_utils -- Shared REST utilities and error handling
  • sql (public) -- SQL compilation entry point
  • udfs -- UDF management endpoints

Usage

The start_server function is called from the Arroyo main binary to launch the API server. It is also used in the embedded single-process mode where the API, controller, and worker run within the same process.

Code Reference

Source Location

Repository
ArroyoSystems/arroyo (GitHub)
File
crates/arroyo-api/src/lib.rs
Lines
L1--L365

Signature

/// Launch the HTTP/HTTPS API server.
/// Returns the port the server is listening on.
pub async fn start_server(
    database: DatabaseSource,
    guard: ShutdownGuard,
) -> anyhow::Result<u16>

/// Connect to the compiler gRPC service for UDF compilation.
pub async fn compiler_service() -> Result<CompilerGrpcClient<Channel>, ErrorResp>

/// Convert an OffsetDateTime to microseconds since epoch.
pub(crate) fn to_micros(dt: OffsetDateTime) -> u64

Import

use arroyo_api::{start_server, compiler_service, ApiDoc};
use arroyo_api::{AuthData, OrgMetadata, HttpError};

I/O Contract

Inputs (start_server)

Name Type Required Description
database DatabaseSource Yes Database connection pool for PostgreSQL
guard ShutdownGuard Yes Graceful shutdown coordinator for spawned server task

Outputs (start_server)

Condition Type Description
Success anyhow::Result<u16> The TCP port the server bound to
Bind failure anyhow::Error Could not bind to the configured address
TLS error anyhow::Error TLS configuration creation failed

Key Data Structures

Struct Description
OrgMetadata Per-organization resource limits (parallelism, operators, running jobs, QPS)
AuthData Authentication context with user ID, org ID, role, and org metadata
HttpError Standardized HTTP error response with code and message
ApiDoc OpenAPI specification struct generated by utoipa

Usage Examples

Starting the API Server

use arroyo_api::start_server;
use arroyo_server_common::shutdown::ShutdownGuard;
use cornucopia_async::DatabaseSource;

let database = DatabaseSource::new(pool);
let guard = ShutdownGuard::new();

let port = start_server(database, guard).await?;
println!("API server listening on port {}", port);

Connecting to the Compiler Service

use arroyo_api::compiler_service;

let mut compiler = compiler_service().await?;
let resp = compiler.compile_query(request).await?;

Default Organization Metadata

use arroyo_api::OrgMetadata;

let metadata = OrgMetadata::default();
// max_parallelism: 32
// max_operators: 30
// max_running_jobs: 10
// kafka_qps: 10_000

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment