Implementation:ArroyoSystems Arroyo Create Connection Profile
Overview
The Create Connection Profile implementation provides the REST API endpoints for creating and testing reusable connection profiles in Arroyo. The create_connection_profile handler persists a validated connection configuration to the database, while test_connection_profile performs a live connectivity check against the external system without persisting anything.
Code Reference
| File | Lines | Purpose |
|---|---|---|
crates/arroyo-api/src/connection_profiles.rs |
L60-L82 | test_connection_profile handler
|
crates/arroyo-api/src/connection_profiles.rs |
L94-L131 | create_connection_profile handler
|
crates/arroyo-api/src/connection_profiles.rs |
L25-L48 | TryFrom<DbConnectionProfile> for domain conversion
|
Signatures
/// POST /v1/connection_profiles
/// Creates a new connection profile after validating the configuration.
pub async fn create_connection_profile(
State(state): State<AppState>,
bearer_auth: BearerAuth,
WithRejection(Json(req), _): WithRejection<Json<ConnectionProfilePost>, ApiError>,
) -> Result<Json<ConnectionProfile>, ErrorResp>
/// POST /v1/connection_profiles/test
/// Tests a connection profile by attempting a live connection.
pub async fn test_connection_profile(
State(state): State<AppState>,
bearer_auth: BearerAuth,
WithRejection(Json(req), _): WithRejection<Json<ConnectionProfilePost>, ApiError>,
) -> Result<Json<TestSourceMessage>, ErrorResp>
Description
create_connection_profile
This handler implements the full creation workflow for a connection profile:
- Authentication -- Authenticates the request using the bearer token and extracts organization and user identity from the
AppStatedatabase. - Connector lookup -- Uses
connector_for_type(&req.connector)to retrieve the connector implementation from the registry. Returns a 400 error if the connector type is unknown. - Configuration validation -- Calls
connector.validate_config(&req.config)to verify that the provided configuration JSON conforms to the connector's expected schema. Returns a 400 error with details if validation fails. - ID generation -- Generates a unique public ID using
generate_id(IdTypes::ConnectionProfile). - Database persistence -- Executes the
create_connection_profileSQL query via cornucopia-generated functions, storing the public ID, organization ID, user ID, name, connector type, and configuration JSON. - Response construction -- Fetches the newly created record back from the database (to include server-generated fields), converts it to the API response type using the
TryFrom<DbConnectionProfile>implementation, and returns it.
The TryFrom<DbConnectionProfile> conversion includes generating a human-readable description string by calling connector.config_description(&val.config), which summarizes the configuration (e.g., "kafka-broker:9092" for a Kafka profile).
test_connection_profile
This handler performs a non-persisting connectivity test:
- Authentication -- Same authentication flow as creation.
- Connector lookup -- Retrieves the connector by type name.
- Test execution -- Calls
connector.test_profile(&req.config)which returns anOption<oneshot::Receiver<TestSourceMessage>>. If the connector does not support testing (returnsNone), a success message is returned immediately. - Result awaiting -- Awaits the oneshot receiver to get the test result from the connector's async test task.
- Response -- Returns the
TestSourceMessageindicating success or failure with diagnostic details.
I/O Contract
| Endpoint | Input | Output |
|---|---|---|
POST /v1/connection_profiles |
ConnectionProfilePost JSON body with fields: name (String), connector (String -- connector type name), config (JSON Value -- connector-specific configuration) |
ConnectionProfile JSON with fields: id (String -- public ID), name (String), connector (String), config (JSON Value), description (String -- human-readable summary)
|
POST /v1/connection_profiles/test |
ConnectionProfilePost JSON body (same as creation) |
TestSourceMessage JSON with connection test result (success/failure with diagnostic message)
|
Usage Examples
Creating a Connection Profile via REST
curl -X POST -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
http://localhost:8000/v1/connection_profiles \
-d '{
"name": "my-kafka-cluster",
"connector": "kafka",
"config": {
"bootstrap_servers": "broker1:9092,broker2:9092"
}
}'
Response:
{
"id": "cp_abc123",
"name": "my-kafka-cluster",
"connector": "kafka",
"config": { "bootstrap_servers": "broker1:9092,broker2:9092" },
"description": "broker1:9092,broker2:9092"
}
Testing a Connection Profile
curl -X POST -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
http://localhost:8000/v1/connection_profiles/test \
-d '{
"name": "my-kafka-cluster",
"connector": "kafka",
"config": {
"bootstrap_servers": "broker1:9092"
}
}'