Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Create Connection Profile

From Leeroopedia


Template:Implementation

Overview

The Create Connection Profile implementation provides the REST API endpoints for creating and testing reusable connection profiles in Arroyo. The create_connection_profile handler persists a validated connection configuration to the database, while test_connection_profile performs a live connectivity check against the external system without persisting anything.

Code Reference

File Lines Purpose
crates/arroyo-api/src/connection_profiles.rs L60-L82 test_connection_profile handler
crates/arroyo-api/src/connection_profiles.rs L94-L131 create_connection_profile handler
crates/arroyo-api/src/connection_profiles.rs L25-L48 TryFrom<DbConnectionProfile> for domain conversion

Signatures

/// POST /v1/connection_profiles
/// Creates a new connection profile after validating the configuration.
pub async fn create_connection_profile(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    WithRejection(Json(req), _): WithRejection<Json<ConnectionProfilePost>, ApiError>,
) -> Result<Json<ConnectionProfile>, ErrorResp>

/// POST /v1/connection_profiles/test
/// Tests a connection profile by attempting a live connection.
pub async fn test_connection_profile(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    WithRejection(Json(req), _): WithRejection<Json<ConnectionProfilePost>, ApiError>,
) -> Result<Json<TestSourceMessage>, ErrorResp>

Description

create_connection_profile

This handler implements the full creation workflow for a connection profile:

  1. Authentication -- Authenticates the request using the bearer token and extracts organization and user identity from the AppState database.
  2. Connector lookup -- Uses connector_for_type(&req.connector) to retrieve the connector implementation from the registry. Returns a 400 error if the connector type is unknown.
  3. Configuration validation -- Calls connector.validate_config(&req.config) to verify that the provided configuration JSON conforms to the connector's expected schema. Returns a 400 error with details if validation fails.
  4. ID generation -- Generates a unique public ID using generate_id(IdTypes::ConnectionProfile).
  5. Database persistence -- Executes the create_connection_profile SQL query via cornucopia-generated functions, storing the public ID, organization ID, user ID, name, connector type, and configuration JSON.
  6. Response construction -- Fetches the newly created record back from the database (to include server-generated fields), converts it to the API response type using the TryFrom<DbConnectionProfile> implementation, and returns it.

The TryFrom<DbConnectionProfile> conversion includes generating a human-readable description string by calling connector.config_description(&val.config), which summarizes the configuration (e.g., "kafka-broker:9092" for a Kafka profile).

test_connection_profile

This handler performs a non-persisting connectivity test:

  1. Authentication -- Same authentication flow as creation.
  2. Connector lookup -- Retrieves the connector by type name.
  3. Test execution -- Calls connector.test_profile(&req.config) which returns an Option<oneshot::Receiver<TestSourceMessage>>. If the connector does not support testing (returns None), a success message is returned immediately.
  4. Result awaiting -- Awaits the oneshot receiver to get the test result from the connector's async test task.
  5. Response -- Returns the TestSourceMessage indicating success or failure with diagnostic details.

I/O Contract

Endpoint Input Output
POST /v1/connection_profiles ConnectionProfilePost JSON body with fields: name (String), connector (String -- connector type name), config (JSON Value -- connector-specific configuration) ConnectionProfile JSON with fields: id (String -- public ID), name (String), connector (String), config (JSON Value), description (String -- human-readable summary)
POST /v1/connection_profiles/test ConnectionProfilePost JSON body (same as creation) TestSourceMessage JSON with connection test result (success/failure with diagnostic message)

Usage Examples

Creating a Connection Profile via REST

curl -X POST -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  http://localhost:8000/v1/connection_profiles \
  -d '{
    "name": "my-kafka-cluster",
    "connector": "kafka",
    "config": {
      "bootstrap_servers": "broker1:9092,broker2:9092"
    }
  }'

Response:

{
  "id": "cp_abc123",
  "name": "my-kafka-cluster",
  "connector": "kafka",
  "config": { "bootstrap_servers": "broker1:9092,broker2:9092" },
  "description": "broker1:9092,broker2:9092"
}

Testing a Connection Profile

curl -X POST -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  http://localhost:8000/v1/connection_profiles/test \
  -d '{
    "name": "my-kafka-cluster",
    "connector": "kafka",
    "config": {
      "bootstrap_servers": "broker1:9092"
    }
  }'

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment