Implementation:ArroyoSystems Arroyo Get Connectors
Overview
The Get Connectors implementation provides the REST API endpoint and underlying registry function that enumerates all available connectors in the Arroyo stream processing engine. The get_connectors async handler serves the GET /v1/connectors endpoint, while the connectors() function in the connectors crate constructs the authoritative registry of all connector implementations.
Code Reference
| File | Lines | Purpose |
|---|---|---|
crates/arroyo-api/src/connectors.rs |
L15-L20 | REST endpoint handler get_connectors
|
crates/arroyo-connectors/src/lib.rs |
L39-L65 | Registry function connectors()
|
crates/arroyo-connectors/src/lib.rs |
L76-L78 | Helper function connector_for_type()
|
Signatures
/// REST endpoint: GET /v1/connectors
/// Lists all available connectors sorted by name.
pub async fn get_connectors() -> Result<Json<ConnectorCollection>, ErrorResp>
/// Registry function: returns all 21 connectors keyed by type name.
pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
// Returns all 21 connectors:
// Blackhole, Confluent, DeltaLake, FileSystem, Fluvio, Iceberg,
// Impulse, Kafka, Kinesis, MQTT, NATS, Nexmark, PollingHTTP,
// Preview, RabbitMQ, Redis, SingleFile, SSE, Stdout, Webhook, WebSocket
}
/// Helper: removes and returns a single connector by type name.
pub fn connector_for_type(t: &str) -> Option<Box<dyn ErasedConnector>>
Description
The implementation consists of two tightly coupled components:
The connectors() Registry Function
Located in crates/arroyo-connectors/src/lib.rs, this function constructs a Vec of all 21 boxed ErasedConnector trait objects and converts them into a HashMap keyed by each connector's name() return value. The connectors registered are:
- Blackhole -- A /dev/null-style sink that discards all data (useful for benchmarking)
- Confluent -- Confluent Cloud managed Kafka integration
- DeltaLake -- Delta Lake table format sink (via filesystem module)
- FileSystem -- General filesystem source/sink (S3, GCS, local)
- Fluvio -- Fluvio streaming platform connector
- Iceberg -- Apache Iceberg table format sink (via filesystem module)
- Impulse -- Synthetic data generator source for testing
- Kafka -- Apache Kafka source/sink
- Kinesis -- AWS Kinesis Data Streams source/sink
- MQTT -- MQTT protocol source/sink
- NATS -- NATS messaging source/sink
- Nexmark -- NEXMark benchmark data generator
- PollingHTTP -- HTTP endpoint polling source
- Preview -- Preview sink for the web console
- RabbitMQ -- RabbitMQ message broker connector
- Redis -- Redis sink connector
- SingleFile -- Single file source (for testing and development)
- SSE -- Server-Sent Events source
- Stdout -- Standard output sink
- Webhook -- Webhook HTTP sink
- WebSocket -- WebSocket source
Each connector implements the ErasedConnector trait (defined in crates/arroyo-operator/src/connector.rs), which provides a type-erased interface with methods for metadata retrieval, configuration validation, connection testing, and operator construction.
The get_connectors REST Handler
Located in crates/arroyo-api/src/connectors.rs, this is a concise async handler that:
- Calls
connectors()to get the full registry - Maps each connector to its metadata via
c.metadata() - Sorts the resulting vector alphabetically by connector name using
sort_by_cached_key - Wraps the result in a
Json<ConnectorCollection>response
The endpoint is annotated with #[utoipa::path] for OpenAPI documentation generation and is registered at GET /v1/connectors.
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | None | GET endpoint with no parameters or request body
|
| Output | Json<ConnectorCollection> |
A JSON object with a data field containing a sorted array of connector metadata objects
|
Each connector metadata object includes:
name-- Display name of the connectoricon-- Icon identifier for UI renderingdescription-- Human-readable descriptionconnection_config-- Optional JSON schema for connection profile configurationtable_config-- JSON schema for table-level configurationenabled-- Whether the connector is available in the current build
Usage Examples
Calling the REST Endpoint
# List all available connectors
curl -H "Authorization: Bearer $TOKEN" \
http://localhost:8000/v1/connectors
Using the Registry Programmatically
use arroyo_connectors::{connectors, connector_for_type};
// Get all connectors and their metadata
let registry = connectors();
let mut metadata: Vec<_> = registry.values().map(|c| c.metadata()).collect();
metadata.sort_by_cached_key(|c| c.name.clone());
// Look up a specific connector by type
let kafka = connector_for_type("kafka").expect("kafka connector not found");
kafka.validate_config(&config_json)?;