Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:ArroyoSystems Arroyo Connection Table Configuration

From Leeroopedia


Template:Principle

Overview

The Connection Table Configuration principle governs how Arroyo configures connection tables -- the primary abstraction that binds together a connector type, an optional connection profile, a data schema, and table-specific options into a usable data source or sink. Connection tables are the units of external data that SQL queries reference, making them the central bridge between the connector system and the query engine.

Description

A connection table represents a specific data source or sink that can be used in SQL queries. It is the result of composing several independent configuration elements:

  • Connector -- The type of external system (e.g., Kafka, Kinesis, Redis). Determined by the connector registry.
  • Connection Profile -- Optional reusable credentials and endpoint configuration. Required for connectors that need authentication (e.g., Kafka with SASL, AWS Kinesis).
  • Schema -- The data schema defining the columns and types of the table. May be defined explicitly, derived from an external schema registry, or inferred from the query.
  • Table Configuration -- Connector-specific settings for this particular table instance (e.g., Kafka topic name, consumer group, read offset).
  • Connection Type -- Whether this table serves as a source (read), sink (write), or lookup table.

The connection table abstraction implements the Adapter pattern, mapping external system-specific concepts to a uniform table interface. This enables:

  • SQL-based access to diverse external systems -- Users write standard SQL queries that reference connection tables by name, without needing to know the underlying connector implementation details.
  • Schema enforcement at the boundary -- The schema attached to a connection table defines the contract between the external system and the query engine. Data is validated against this schema during ingestion.
  • Decoupled connector configuration from query logic -- Table configuration is specified once at table creation time and persisted. Queries reference tables by name, not by their full configuration.

Table Creation Workflow

The creation of a connection table follows a multi-step validation workflow:

  1. Connector resolution -- The system looks up the connector by type name using the connector registry.
  2. Profile validation -- If a connection profile ID is specified, the system retrieves the profile and verifies that its connector type matches the table's connector type.
  3. Table config validation -- The connector validates the table-specific configuration against its expected schema.
  4. Schema expansion -- If a schema with a format definition is provided, the system expands it by converting format-specific schema definitions (Avro, Protobuf, JSON Schema) into Arrow-typed SourceField objects.
  5. Schema validation -- The expanded schema is validated for correctness (no duplicate field names, valid types, etc.).
  6. Persistence -- The validated table is persisted to the database with its public ID, connector type, connection type, configuration, and schema.

Table Types

Connection tables are classified by their ConnectionType:

Type Description Example
Source Reads data from an external system Kafka topic consumer
Sink Writes data to an external system Redis key-value writer
Lookup Random-access reads during query execution Redis key lookup

The connection type is determined by the connector based on the profile and table configuration. Some connectors support multiple types (e.g., Kafka supports both source and sink).

Theoretical Basis

Connection tables implement the Adapter pattern from the Gang of Four design patterns. The adapter converts the interface of an external system (which may use Kafka's producer/consumer API, AWS SDK, HTTP requests, etc.) into the uniform table interface expected by the SQL query engine.

Key theoretical concepts:

  • Interface Uniformity -- All external systems, regardless of their native protocol, are represented as tables with a name, schema, and connection type. This allows the SQL layer to treat all data sources identically.
  • Configuration Composition -- A connection table's full configuration is the composition of its connector's defaults, its profile's shared settings, and its own table-specific settings. This layered composition follows the Composite Configuration pattern.
  • Separation of Declaration and Execution -- Table configuration is declarative (specifying what the table is), while the actual data reading/writing is handled by operator construction during pipeline compilation. This separation enables validation and testing before execution.
  • Persistence as Source of Truth -- Connection tables are persisted to the database, making them durable across system restarts and available for multiple pipelines to reference.

Usage

Connection tables are created and used through two primary interfaces:

REST API

# Create a Kafka source table
curl -X POST -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  http://localhost:8000/v1/connection_tables \
  -d '{
    "name": "orders",
    "connector": "kafka",
    "connection_profile_id": "cp_abc123",
    "config": {
      "topic": "orders",
      "type": "source",
      "source.offset": "latest"
    },
    "schema": {
      "format": {"json": {}},
      "definition": {"jsonSchema": {"schema": "{...}"}},
      "fields": []
    }
  }'

SQL DDL

CREATE TABLE orders (
    order_id BIGINT NOT NULL,
    customer_id BIGINT NOT NULL,
    amount DOUBLE NOT NULL,
    event_time TIMESTAMP NOT NULL,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    connector = 'kafka',
    connection_profile = 'production-kafka',
    topic = 'orders',
    format = 'json'
);

-- Use the table in a query
SELECT customer_id, SUM(amount) as total
FROM orders
GROUP BY customer_id, TUMBLE(event_time, INTERVAL '1' HOUR);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment