Implementation:ArroyoSystems Arroyo Api Server
| Field | Value |
|---|---|
| Sources | ArroyoSystems/arroyo |
| Domains | Stream_Processing, API_Infrastructure |
| Last Updated | 2026-02-08 |
Overview
Api_Server is the library root of the arroyo-api crate, providing the HTTP/HTTPS server initialization, OpenAPI specification generation, authentication data structures, organization resource limits, and module declarations for the Arroyo REST API.
Description
The crates/arroyo-api/src/lib.rs file serves as the entry point for the arroyo-api crate, which is the primary interface for the Arroyo web UI and programmatic clients to create and manage streaming pipelines, jobs, connections, UDFs, and metrics.
Server Startup
The start_server function is the main entry point for launching the API server. It performs:
- Configuration loading: Reads the API bind address and HTTP port from
arroyo_rpc::config::config(). - Router creation: Calls
rest::create_rest_app(database)to build the Axum router with all API routes and static file serving. - Compression: Applies zstd compression via
tower_http::CompressionLayer, explicitly excludingtext/event-streamcontent type (SSE streams are incompatible with compression). - Authentication layer: If the configuration specifies
ApiAuthMode::StaticApiKey, wraps the application in aValidateRequestHeaderLayer::bearermiddleware. - TLS support: Optionally creates a TLS configuration for HTTPS serving using
arroyo_server_common::tls::create_http_tls_config. - Server binding: Binds to the configured address using either
axum_server::bind_rustls(for HTTPS) oraxum::serve(for plain HTTP), and spawns the server task via theShutdownGuard.
Organization Metadata
The OrgMetadata struct defines per-organization resource limits with sensible defaults:
can_create_programs-- Whether the org can create programs (default:false)max_nexmark_qps-- Maximum Nexmark benchmark QPS (default: 1000.0)max_impulse_qps-- Maximum Impulse source QPS (default: 1000.0)max_parallelism-- Maximum operator parallelism (default: 32)max_operators-- Maximum operators per pipeline (default: 30)max_running_jobs-- Maximum concurrent running jobs (default: 10)kafka_qps-- Maximum Kafka messages per second (default: 10,000)
Authentication Data
The AuthData struct carries the authentication context through API handlers:
user_id-- The authenticated user identifierorganization_id-- The organization the user belongs torole-- The user's role within the organizationorg_metadata-- The organization's resource limits
Compiler Service Client
The compiler_service function establishes a gRPC connection to the compiler service, used for UDF compilation. It reads the compiler endpoint from configuration and returns a CompilerGrpcClient.
Error Types
The HttpError struct and HttpErrorCode enum provide standardized error responses with codes: Unauthorized, InvalidCredentials, and ServerError. The HttpError implements IntoResponse to map error codes to appropriate HTTP status codes.
OpenAPI Specification
The ApiDoc struct uses utoipa::OpenApi to generate the full OpenAPI 3.0 specification. It declares all 27 REST endpoint paths and over 60 schema components covering pipelines, jobs, checkpoints, connections, connectors, UDFs, metrics, and formats.
Module Structure
The crate declares the following submodules:
cloud-- Cloud-specific functionalityconnection_profiles-- Connection profile CRUD endpointsconnection_tables-- Connection table CRUD endpointsconnectors-- Connector enumeration endpointjobs-- Job management endpointsmetrics-- Job metrics endpointpipelines-- Pipeline CRUD endpointsrest(public) -- Axum router and application staterest_utils-- Shared REST utilities and error handlingsql(public) -- SQL compilation entry pointudfs-- UDF management endpoints
Usage
The start_server function is called from the Arroyo main binary to launch the API server. It is also used in the embedded single-process mode where the API, controller, and worker run within the same process.
Code Reference
Source Location
- Repository
ArroyoSystems/arroyo(GitHub)- File
crates/arroyo-api/src/lib.rs- Lines
- L1--L365
Signature
/// Launch the HTTP/HTTPS API server.
/// Returns the port the server is listening on.
pub async fn start_server(
database: DatabaseSource,
guard: ShutdownGuard,
) -> anyhow::Result<u16>
/// Connect to the compiler gRPC service for UDF compilation.
pub async fn compiler_service() -> Result<CompilerGrpcClient<Channel>, ErrorResp>
/// Convert an OffsetDateTime to microseconds since epoch.
pub(crate) fn to_micros(dt: OffsetDateTime) -> u64
Import
use arroyo_api::{start_server, compiler_service, ApiDoc};
use arroyo_api::{AuthData, OrgMetadata, HttpError};
I/O Contract
Inputs (start_server)
| Name | Type | Required | Description |
|---|---|---|---|
database |
DatabaseSource |
Yes | Database connection pool for PostgreSQL |
guard |
ShutdownGuard |
Yes | Graceful shutdown coordinator for spawned server task |
Outputs (start_server)
| Condition | Type | Description |
|---|---|---|
| Success | anyhow::Result<u16> |
The TCP port the server bound to |
| Bind failure | anyhow::Error |
Could not bind to the configured address |
| TLS error | anyhow::Error |
TLS configuration creation failed |
Key Data Structures
| Struct | Description |
|---|---|
OrgMetadata |
Per-organization resource limits (parallelism, operators, running jobs, QPS) |
AuthData |
Authentication context with user ID, org ID, role, and org metadata |
HttpError |
Standardized HTTP error response with code and message |
ApiDoc |
OpenAPI specification struct generated by utoipa |
Usage Examples
Starting the API Server
use arroyo_api::start_server;
use arroyo_server_common::shutdown::ShutdownGuard;
use cornucopia_async::DatabaseSource;
let database = DatabaseSource::new(pool);
let guard = ShutdownGuard::new();
let port = start_server(database, guard).await?;
println!("API server listening on port {}", port);
Connecting to the Compiler Service
use arroyo_api::compiler_service;
let mut compiler = compiler_service().await?;
let resp = compiler.compile_query(request).await?;
Default Organization Metadata
use arroyo_api::OrgMetadata;
let metadata = OrgMetadata::default();
// max_parallelism: 32
// max_operators: 30
// max_running_jobs: 10
// kafka_qps: 10_000