Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Create Connection Table

From Leeroopedia


Template:Implementation

Overview

The Create Connection Table implementation provides the REST API endpoint for creating connection tables in Arroyo, along with the Connector::from_options trait method that handles SQL DDL-based table creation. Connection tables are the primary abstraction for binding a connector, connection profile, schema, and table-specific configuration into a usable data source or sink.

Code Reference

File Lines Purpose
crates/arroyo-api/src/connection_tables.rs L260-L311 create_connection_table REST handler
crates/arroyo-api/src/connection_tables.rs L45-L126 get_and_validate_connector validation helper
crates/arroyo-operator/src/connector.rs L137-L143 Connector::from_options trait method
crates/arroyo-api/src/connection_tables.rs L313-L372 TryInto<ConnectionTable> for DbConnectionTable

Signatures

/// POST /v1/connection_tables
/// Creates a new connection table after validating connector, profile, and schema.
pub async fn create_connection_table(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    WithRejection(Json(req), _): WithRejection<Json<ConnectionTablePost>, ApiError>,
) -> Result<Json<ConnectionTable>, ErrorResp>

/// Connector trait method for SQL DDL-based table creation.
/// Parses connector options from CREATE TABLE WITH clause.
fn from_options(
    &self,
    name: &str,
    options: &mut ConnectorOptions,
    schema: Option<&ConnectionSchema>,
    profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection>

Description

REST Handler: create_connection_table

The create_connection_table handler implements the full table creation workflow:

  1. Authentication -- Authenticates the request via bearer token and extracts organization and user identity.
  2. Validation -- Delegates to get_and_validate_connector which performs:
    • Connector lookup -- Retrieves the connector from the registry using connector_for_type(&req.connector).
    • Profile validation -- If connection_profile_id is provided, fetches the profile from the database and verifies its connector type matches the request's connector type. If the connector requires a profile but none is specified, returns a 400 error.
    • Table config validation -- Calls connector.validate_table(&req.config) to verify the table-specific configuration.
    • Schema expansion -- If a schema is provided, calls expand_schema to convert format-specific definitions (Avro, Protobuf, JSON Schema) to Arrow-typed fields, then calls .validate() on the result.
  3. Connection type determination -- Calls connector.table_type(&profile, &req.config) to determine whether the table is a source, sink, or lookup table.
  4. Schema serialization -- Converts the validated schema to a JSON value for database storage.
  5. ID generation -- Generates a unique public ID using generate_id(IdTypes::ConnectionTable).
  6. Database persistence -- Executes the create_connection_table SQL query via cornucopia, storing the public ID, organization, user, name, table type, connector type, profile reference, configuration, and schema.
  7. Response construction -- Fetches the newly created record from the database and converts it to the API response type using TryInto<ConnectionTable>.

Trait Method: Connector::from_options

The from_options method on the Connector trait is the counterpart to the REST endpoint, used when tables are defined via SQL CREATE TABLE statements:

fn from_options(
    &self,
    name: &str,
    options: &mut ConnectorOptions,
    schema: Option<&ConnectionSchema>,
    profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection>

Each connector implementation provides its own from_options that:

  • Extracts connector-specific options from the ConnectorOptions map (e.g., topic, type, source.offset for Kafka)
  • Validates the options and constructs a typed configuration struct
  • Returns a Connection object containing the connector name, connection type, schema, serialized configuration, and a human-readable description

Database Conversion: TryInto<ConnectionTable>

The TryInto<ConnectionTable> for DbConnectionTable implementation (L313-L372) handles converting database records back to API types:

  1. Looks up the connector from the registry
  2. Reconstructs the connection profile (if present) from joined database fields
  3. Deserializes and validates the stored schema
  4. Calls connector.get_schema to allow the connector to augment the stored schema with any connector-specific fields
  5. Constructs the final ConnectionTable with all metadata

I/O Contract

Direction Type Description
Input ConnectionTablePost JSON body with fields: name (String), connector (String -- type name), connection_profile_id (Option<String>), config (JSON Value -- connector-specific table config), schema (Option<ConnectionSchema> -- format and definition)
Output Json<ConnectionTable> Persisted table with fields: id (internal), pub_id (String), name (String), created_at (timestamp), connection_profile (Option), connector (String), table_type (source/sink/lookup), config (JSON), schema (ConnectionSchema with expanded fields), consumers (u32 -- count of pipelines using this table)

Usage Examples

REST API Table Creation

# Create a Kafka source table with JSON format
curl -X POST -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  http://localhost:8000/v1/connection_tables \
  -d '{
    "name": "user_events",
    "connector": "kafka",
    "connection_profile_id": "cp_abc123",
    "config": {
      "topic": "user-events",
      "type": "source",
      "source.offset": "latest"
    },
    "schema": {
      "format": {"json": {}},
      "definition": {
        "jsonSchema": {
          "schema": "{\"type\":\"object\",\"properties\":{\"user_id\":{\"type\":\"integer\"},\"event\":{\"type\":\"string\"}}}"
        }
      },
      "fields": []
    }
  }'

SQL DDL Table Creation (via from_options)

CREATE TABLE user_events (
    user_id BIGINT NOT NULL,
    event TEXT NOT NULL,
    event_time TIMESTAMP NOT NULL,
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    connector = 'kafka',
    connection_profile = 'production-kafka',
    topic = 'user-events',
    type = 'source',
    format = 'json'
);

Connector::from_options Implementation (Kafka Example)

// Simplified illustration of how a connector implements from_options
fn from_options(
    &self,
    name: &str,
    options: &mut ConnectorOptions,
    schema: Option<&ConnectionSchema>,
    profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
    let topic = options.pull_str("topic")?;
    let table_type = options.pull_str("type")?;
    // ... parse remaining options ...
    Ok(Connection::new(
        None, "kafka", name.to_string(),
        ConnectionType::Source, schema.unwrap().clone(),
        &kafka_table, format!("kafka topic '{topic}'"),
    ))
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment