Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Get Connectors

From Leeroopedia


Template:Implementation

Overview

The Get Connectors implementation provides the REST API endpoint and underlying registry function that enumerates all available connectors in the Arroyo stream processing engine. The get_connectors async handler serves the GET /v1/connectors endpoint, while the connectors() function in the connectors crate constructs the authoritative registry of all connector implementations.

Code Reference

File Lines Purpose
crates/arroyo-api/src/connectors.rs L15-L20 REST endpoint handler get_connectors
crates/arroyo-connectors/src/lib.rs L39-L65 Registry function connectors()
crates/arroyo-connectors/src/lib.rs L76-L78 Helper function connector_for_type()

Signatures

/// REST endpoint: GET /v1/connectors
/// Lists all available connectors sorted by name.
pub async fn get_connectors() -> Result<Json<ConnectorCollection>, ErrorResp>

/// Registry function: returns all 21 connectors keyed by type name.
pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
    // Returns all 21 connectors:
    // Blackhole, Confluent, DeltaLake, FileSystem, Fluvio, Iceberg,
    // Impulse, Kafka, Kinesis, MQTT, NATS, Nexmark, PollingHTTP,
    // Preview, RabbitMQ, Redis, SingleFile, SSE, Stdout, Webhook, WebSocket
}

/// Helper: removes and returns a single connector by type name.
pub fn connector_for_type(t: &str) -> Option<Box<dyn ErasedConnector>>

Description

The implementation consists of two tightly coupled components:

The connectors() Registry Function

Located in crates/arroyo-connectors/src/lib.rs, this function constructs a Vec of all 21 boxed ErasedConnector trait objects and converts them into a HashMap keyed by each connector's name() return value. The connectors registered are:

  • Blackhole -- A /dev/null-style sink that discards all data (useful for benchmarking)
  • Confluent -- Confluent Cloud managed Kafka integration
  • DeltaLake -- Delta Lake table format sink (via filesystem module)
  • FileSystem -- General filesystem source/sink (S3, GCS, local)
  • Fluvio -- Fluvio streaming platform connector
  • Iceberg -- Apache Iceberg table format sink (via filesystem module)
  • Impulse -- Synthetic data generator source for testing
  • Kafka -- Apache Kafka source/sink
  • Kinesis -- AWS Kinesis Data Streams source/sink
  • MQTT -- MQTT protocol source/sink
  • NATS -- NATS messaging source/sink
  • Nexmark -- NEXMark benchmark data generator
  • PollingHTTP -- HTTP endpoint polling source
  • Preview -- Preview sink for the web console
  • RabbitMQ -- RabbitMQ message broker connector
  • Redis -- Redis sink connector
  • SingleFile -- Single file source (for testing and development)
  • SSE -- Server-Sent Events source
  • Stdout -- Standard output sink
  • Webhook -- Webhook HTTP sink
  • WebSocket -- WebSocket source

Each connector implements the ErasedConnector trait (defined in crates/arroyo-operator/src/connector.rs), which provides a type-erased interface with methods for metadata retrieval, configuration validation, connection testing, and operator construction.

The get_connectors REST Handler

Located in crates/arroyo-api/src/connectors.rs, this is a concise async handler that:

  1. Calls connectors() to get the full registry
  2. Maps each connector to its metadata via c.metadata()
  3. Sorts the resulting vector alphabetically by connector name using sort_by_cached_key
  4. Wraps the result in a Json<ConnectorCollection> response

The endpoint is annotated with #[utoipa::path] for OpenAPI documentation generation and is registered at GET /v1/connectors.

I/O Contract

Direction Type Description
Input None GET endpoint with no parameters or request body
Output Json<ConnectorCollection> A JSON object with a data field containing a sorted array of connector metadata objects

Each connector metadata object includes:

  • name -- Display name of the connector
  • icon -- Icon identifier for UI rendering
  • description -- Human-readable description
  • connection_config -- Optional JSON schema for connection profile configuration
  • table_config -- JSON schema for table-level configuration
  • enabled -- Whether the connector is available in the current build

Usage Examples

Calling the REST Endpoint

# List all available connectors
curl -H "Authorization: Bearer $TOKEN" \
  http://localhost:8000/v1/connectors

Using the Registry Programmatically

use arroyo_connectors::{connectors, connector_for_type};

// Get all connectors and their metadata
let registry = connectors();
let mut metadata: Vec<_> = registry.values().map(|c| c.metadata()).collect();
metadata.sort_by_cached_key(|c| c.name.clone());

// Look up a specific connector by type
let kafka = connector_for_type("kafka").expect("kafka connector not found");
kafka.validate_config(&config_json)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment