Implementation:ArroyoSystems Arroyo Create Connection Table
Overview
The Create Connection Table implementation provides the REST API endpoint for creating connection tables in Arroyo, along with the Connector::from_options trait method that handles SQL DDL-based table creation. Connection tables are the primary abstraction for binding a connector, connection profile, schema, and table-specific configuration into a usable data source or sink.
Code Reference
| File | Lines | Purpose |
|---|---|---|
crates/arroyo-api/src/connection_tables.rs |
L260-L311 | create_connection_table REST handler
|
crates/arroyo-api/src/connection_tables.rs |
L45-L126 | get_and_validate_connector validation helper
|
crates/arroyo-operator/src/connector.rs |
L137-L143 | Connector::from_options trait method
|
crates/arroyo-api/src/connection_tables.rs |
L313-L372 | TryInto<ConnectionTable> for DbConnectionTable
|
Signatures
/// POST /v1/connection_tables
/// Creates a new connection table after validating connector, profile, and schema.
pub async fn create_connection_table(
State(state): State<AppState>,
bearer_auth: BearerAuth,
WithRejection(Json(req), _): WithRejection<Json<ConnectionTablePost>, ApiError>,
) -> Result<Json<ConnectionTable>, ErrorResp>
/// Connector trait method for SQL DDL-based table creation.
/// Parses connector options from CREATE TABLE WITH clause.
fn from_options(
&self,
name: &str,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection>
Description
REST Handler: create_connection_table
The create_connection_table handler implements the full table creation workflow:
- Authentication -- Authenticates the request via bearer token and extracts organization and user identity.
- Validation -- Delegates to
get_and_validate_connectorwhich performs:- Connector lookup -- Retrieves the connector from the registry using
connector_for_type(&req.connector). - Profile validation -- If
connection_profile_idis provided, fetches the profile from the database and verifies its connector type matches the request's connector type. If the connector requires a profile but none is specified, returns a 400 error. - Table config validation -- Calls
connector.validate_table(&req.config)to verify the table-specific configuration. - Schema expansion -- If a schema is provided, calls
expand_schemato convert format-specific definitions (Avro, Protobuf, JSON Schema) to Arrow-typed fields, then calls.validate()on the result.
- Connector lookup -- Retrieves the connector from the registry using
- Connection type determination -- Calls
connector.table_type(&profile, &req.config)to determine whether the table is a source, sink, or lookup table. - Schema serialization -- Converts the validated schema to a JSON value for database storage.
- ID generation -- Generates a unique public ID using
generate_id(IdTypes::ConnectionTable). - Database persistence -- Executes the
create_connection_tableSQL query via cornucopia, storing the public ID, organization, user, name, table type, connector type, profile reference, configuration, and schema. - Response construction -- Fetches the newly created record from the database and converts it to the API response type using
TryInto<ConnectionTable>.
Trait Method: Connector::from_options
The from_options method on the Connector trait is the counterpart to the REST endpoint, used when tables are defined via SQL CREATE TABLE statements:
fn from_options(
&self,
name: &str,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection>
Each connector implementation provides its own from_options that:
- Extracts connector-specific options from the
ConnectorOptionsmap (e.g.,topic,type,source.offsetfor Kafka) - Validates the options and constructs a typed configuration struct
- Returns a
Connectionobject containing the connector name, connection type, schema, serialized configuration, and a human-readable description
Database Conversion: TryInto<ConnectionTable>
The TryInto<ConnectionTable> for DbConnectionTable implementation (L313-L372) handles converting database records back to API types:
- Looks up the connector from the registry
- Reconstructs the connection profile (if present) from joined database fields
- Deserializes and validates the stored schema
- Calls
connector.get_schemato allow the connector to augment the stored schema with any connector-specific fields - Constructs the final
ConnectionTablewith all metadata
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | ConnectionTablePost |
JSON body with fields: name (String), connector (String -- type name), connection_profile_id (Option<String>), config (JSON Value -- connector-specific table config), schema (Option<ConnectionSchema> -- format and definition)
|
| Output | Json<ConnectionTable> |
Persisted table with fields: id (internal), pub_id (String), name (String), created_at (timestamp), connection_profile (Option), connector (String), table_type (source/sink/lookup), config (JSON), schema (ConnectionSchema with expanded fields), consumers (u32 -- count of pipelines using this table)
|
Usage Examples
REST API Table Creation
# Create a Kafka source table with JSON format
curl -X POST -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
http://localhost:8000/v1/connection_tables \
-d '{
"name": "user_events",
"connector": "kafka",
"connection_profile_id": "cp_abc123",
"config": {
"topic": "user-events",
"type": "source",
"source.offset": "latest"
},
"schema": {
"format": {"json": {}},
"definition": {
"jsonSchema": {
"schema": "{\"type\":\"object\",\"properties\":{\"user_id\":{\"type\":\"integer\"},\"event\":{\"type\":\"string\"}}}"
}
},
"fields": []
}
}'
SQL DDL Table Creation (via from_options)
CREATE TABLE user_events (
user_id BIGINT NOT NULL,
event TEXT NOT NULL,
event_time TIMESTAMP NOT NULL,
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
connector = 'kafka',
connection_profile = 'production-kafka',
topic = 'user-events',
type = 'source',
format = 'json'
);
Connector::from_options Implementation (Kafka Example)
// Simplified illustration of how a connector implements from_options
fn from_options(
&self,
name: &str,
options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
) -> anyhow::Result<Connection> {
let topic = options.pull_str("topic")?;
let table_type = options.pull_str("type")?;
// ... parse remaining options ...
Ok(Connection::new(
None, "kafka", name.to_string(),
ConnectionType::Source, schema.unwrap().clone(),
&kafka_table, format!("kafka topic '{topic}'"),
))
}
Related Pages
- Principle:ArroyoSystems_Arroyo_Connection_Table_Configuration
- Environment:ArroyoSystems_Arroyo_PostgreSQL_Database
- Implementation:ArroyoSystems_Arroyo_Get_Connectors
- Implementation:ArroyoSystems_Arroyo_Create_Connection_Profile
- Implementation:ArroyoSystems_Arroyo_Expand_Schema
- Implementation:ArroyoSystems_Arroyo_Test_Connection_Table
- Implementation:ArroyoSystems_Arroyo_Schema_Provider