Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Test Connection Table

From Leeroopedia


Template:Implementation

Overview

The Test Connection Table implementation provides the REST API endpoint that validates connection table configurations by attempting live connections to external systems. The test_connection_table handler streams test results back to the client using Server-Sent Events (SSE), enabling real-time feedback on multi-step validation processes including connectivity checks, authentication, and sample data reading.

Code Reference

File Lines Purpose
crates/arroyo-api/src/connection_tables.rs L173-L194 test_connection_table REST handler
crates/arroyo-api/src/connection_tables.rs L45-L126 get_and_validate_connector shared validation helper
crates/arroyo-operator/src/connector.rs L128-L135 Connector::test trait method signature

Signature

/// POST /v1/connection_tables/test
/// Tests a connection table by attempting a live connection and streaming results via SSE.
pub(crate) async fn test_connection_table(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    WithRejection(Json(req), _): WithRejection<Json<ConnectionTablePost>, ApiError>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ErrorResp>

Description

The test_connection_table handler implements a streaming validation workflow:

Step 1: Authentication and Validation

The handler first authenticates the request and validates the connection table configuration:

  1. Authentication -- Authenticates via bearer token using authenticate(&state.database, bearer_auth).
  2. Full validation -- Calls get_and_validate_connector(&req, &auth_data, &state.database) which performs:
    • Connector lookup from the registry
    • Connection profile retrieval and type validation (if profile ID specified)
    • Table configuration validation via connector.validate_table(&req.config)
    • Schema expansion and validation (if schema provided)
  3. The validation returns a tuple of (connector, _, profile_config, schema).

Step 2: Channel Setup

The handler creates a tokio mpsc channel with buffer capacity of 8:

let (tx, rx) = channel(8);

This channel decouples the connector's asynchronous test execution from the HTTP response stream. The connector writes TestSourceMessage items to the sender (tx), and the SSE response reads from the receiver (rx).

Step 3: Test Initiation

The handler calls the connector's test method:

connector
    .test(&req.name, &profile, &req.config, schema.as_ref(), tx)
    .map_err(|e| bad_request(format!("Failed to parse config or schema: {e:?}")))?;

The test method is defined on the Connector trait and each connector provides its own implementation. The method:

  • Spawns an asynchronous task to perform the connection test
  • Takes ownership of the channel sender
  • Sends TestSourceMessage items as the test progresses
  • The task completes when all checks are done (sender is dropped, closing the channel)

Step 4: SSE Stream Construction

The handler wraps the channel receiver in an SSE stream:

let stream = ReceiverStream::new(rx);

Ok(Sse::new(
    stream.map(|msg| Ok(Event::default().json_data(msg).unwrap())),
))

Each TestSourceMessage received from the channel is serialized as a JSON SSE event. The stream terminates naturally when the connector's test task completes and drops its sender.

TestSourceMessage Structure

The TestSourceMessage type carries test results with a status field indicating the type of message:

  • info -- Informational status update (e.g., "Connected to broker", "Found topic with N partitions")
  • data -- Sample data record read from the source
  • error -- Error message indicating a test step failed
  • done -- Final message indicating the test is complete

Error Handling

Errors are handled at two levels:

  • Pre-flight errors -- If authentication fails, the connector is not found, or configuration validation fails, a standard HTTP error response is returned (not SSE).
  • Runtime errors -- If the connection test encounters errors during execution (e.g., connection refused, authentication failed), the errors are sent as TestSourceMessage items on the SSE stream.

I/O Contract

Direction Type Description
Input ConnectionTablePost JSON body with fields: name (String), connector (String), connection_profile_id (Option<String>), config (JSON Value), schema (Option<ConnectionSchema>)
Output Sse<impl Stream<Item = Result<Event, Infallible>>> SSE stream of TestSourceMessage items, each serialized as a JSON SSE event

Each SSE event has the format:

data: {"status":"info|data|error|done","message":"..."}

Usage Examples

Testing a Kafka Source

# Test a Kafka source table (use -N for streaming output)
curl -N -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8000/v1/connection_tables/test \
  -d '{
    "name": "orders",
    "connector": "kafka",
    "connection_profile_id": "cp_kafka_prod",
    "config": {
      "topic": "orders",
      "type": "source",
      "source.offset": "latest"
    },
    "schema": {
      "format": {"json": {}},
      "fields": []
    }
  }'

Testing a Kinesis Source

curl -N -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8000/v1/connection_tables/test \
  -d '{
    "name": "click_stream",
    "connector": "kinesis",
    "connection_profile_id": "cp_aws_prod",
    "config": {
      "stream_name": "click-events",
      "type": "source"
    },
    "schema": {
      "format": {"json": {}},
      "fields": []
    }
  }'

Internal Channel Architecture

                            tokio::spawn
                         +-----------------+
  POST /test  ------>    | Connector::test |
                         |   (async task)  |
        mpsc::channel    |                 |
  +---<--- tx <----------+  sends messages |
  |                      +-----------------+
  |
  v
  rx ---> ReceiverStream ---> SSE::new ---> HTTP Response
                                            (text/event-stream)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment