Implementation:ArroyoSystems Arroyo Test Connection Table
Overview
The Test Connection Table implementation provides the REST API endpoint that validates connection table configurations by attempting live connections to external systems. The test_connection_table handler streams test results back to the client using Server-Sent Events (SSE), enabling real-time feedback on multi-step validation processes including connectivity checks, authentication, and sample data reading.
Code Reference
| File | Lines | Purpose |
|---|---|---|
crates/arroyo-api/src/connection_tables.rs |
L173-L194 | test_connection_table REST handler
|
crates/arroyo-api/src/connection_tables.rs |
L45-L126 | get_and_validate_connector shared validation helper
|
crates/arroyo-operator/src/connector.rs |
L128-L135 | Connector::test trait method signature
|
Signature
/// POST /v1/connection_tables/test
/// Tests a connection table by attempting a live connection and streaming results via SSE.
pub(crate) async fn test_connection_table(
State(state): State<AppState>,
bearer_auth: BearerAuth,
WithRejection(Json(req), _): WithRejection<Json<ConnectionTablePost>, ApiError>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ErrorResp>
Description
The test_connection_table handler implements a streaming validation workflow:
Step 1: Authentication and Validation
The handler first authenticates the request and validates the connection table configuration:
- Authentication -- Authenticates via bearer token using
authenticate(&state.database, bearer_auth). - Full validation -- Calls
get_and_validate_connector(&req, &auth_data, &state.database)which performs:- Connector lookup from the registry
- Connection profile retrieval and type validation (if profile ID specified)
- Table configuration validation via
connector.validate_table(&req.config) - Schema expansion and validation (if schema provided)
- The validation returns a tuple of
(connector, _, profile_config, schema).
Step 2: Channel Setup
The handler creates a tokio mpsc channel with buffer capacity of 8:
let (tx, rx) = channel(8);
This channel decouples the connector's asynchronous test execution from the HTTP response stream. The connector writes TestSourceMessage items to the sender (tx), and the SSE response reads from the receiver (rx).
Step 3: Test Initiation
The handler calls the connector's test method:
connector
.test(&req.name, &profile, &req.config, schema.as_ref(), tx)
.map_err(|e| bad_request(format!("Failed to parse config or schema: {e:?}")))?;
The test method is defined on the Connector trait and each connector provides its own implementation. The method:
- Spawns an asynchronous task to perform the connection test
- Takes ownership of the channel sender
- Sends
TestSourceMessageitems as the test progresses - The task completes when all checks are done (sender is dropped, closing the channel)
Step 4: SSE Stream Construction
The handler wraps the channel receiver in an SSE stream:
let stream = ReceiverStream::new(rx);
Ok(Sse::new(
stream.map(|msg| Ok(Event::default().json_data(msg).unwrap())),
))
Each TestSourceMessage received from the channel is serialized as a JSON SSE event. The stream terminates naturally when the connector's test task completes and drops its sender.
TestSourceMessage Structure
The TestSourceMessage type carries test results with a status field indicating the type of message:
- info -- Informational status update (e.g., "Connected to broker", "Found topic with N partitions")
- data -- Sample data record read from the source
- error -- Error message indicating a test step failed
- done -- Final message indicating the test is complete
Error Handling
Errors are handled at two levels:
- Pre-flight errors -- If authentication fails, the connector is not found, or configuration validation fails, a standard HTTP error response is returned (not SSE).
- Runtime errors -- If the connection test encounters errors during execution (e.g., connection refused, authentication failed), the errors are sent as
TestSourceMessageitems on the SSE stream.
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | ConnectionTablePost |
JSON body with fields: name (String), connector (String), connection_profile_id (Option<String>), config (JSON Value), schema (Option<ConnectionSchema>)
|
| Output | Sse<impl Stream<Item = Result<Event, Infallible>>> |
SSE stream of TestSourceMessage items, each serialized as a JSON SSE event
|
Each SSE event has the format:
data: {"status":"info|data|error|done","message":"..."}
Usage Examples
Testing a Kafka Source
# Test a Kafka source table (use -N for streaming output)
curl -N -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-X POST http://localhost:8000/v1/connection_tables/test \
-d '{
"name": "orders",
"connector": "kafka",
"connection_profile_id": "cp_kafka_prod",
"config": {
"topic": "orders",
"type": "source",
"source.offset": "latest"
},
"schema": {
"format": {"json": {}},
"fields": []
}
}'
Testing a Kinesis Source
curl -N -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-X POST http://localhost:8000/v1/connection_tables/test \
-d '{
"name": "click_stream",
"connector": "kinesis",
"connection_profile_id": "cp_aws_prod",
"config": {
"stream_name": "click-events",
"type": "source"
},
"schema": {
"format": {"json": {}},
"fields": []
}
}'
Internal Channel Architecture
tokio::spawn
+-----------------+
POST /test ------> | Connector::test |
| (async task) |
mpsc::channel | |
+---<--- tx <----------+ sends messages |
| +-----------------+
|
v
rx ---> ReceiverStream ---> SSE::new ---> HTTP Response
(text/event-stream)