Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:ArroyoSystems Arroyo SQL Table Integration

From Leeroopedia


Template:Principle

Overview

The SQL Table Integration principle governs how Arroyo integrates connection tables into the SQL query planning layer, enabling users to reference external data sources and sinks directly in SQL queries using standard table names. This integration bridges the connector system with the DataFusion query planner, making external systems accessible through familiar SQL syntax.

Description

In Arroyo, SQL is the primary interface for defining stream processing pipelines. Users write SQL queries that reference tables representing external data sources (Kafka topics, Kinesis streams, HTTP endpoints) and sinks (Redis, file systems, webhooks). For this to work, the SQL query planner must be able to resolve table names to their schemas and understand how to read from and write to them.

SQL table integration serves as the bridge between two subsystems:

  • The Connector System -- Which manages connection profiles, connection tables, and connector implementations (as described in the Connector Registry and Connection Table Configuration principles).
  • The Query Planner -- Which parses SQL statements, resolves table references, performs type checking, and generates logical query plans.

The integration involves three key mechanisms:

Table Registration

Pre-existing connection tables (created via the REST API or previous SQL sessions) are registered with the schema provider before query planning begins. This allows queries to reference tables by name without needing to redefine them:

-- References a previously created connection table
SELECT * FROM orders WHERE amount > 100;

CREATE TABLE Statement Parsing

New tables can be defined inline within a SQL session using CREATE TABLE statements with a WITH clause containing connector options:

CREATE TABLE orders (
    order_id BIGINT NOT NULL,
    amount DOUBLE NOT NULL,
    event_time TIMESTAMP NOT NULL
) WITH (
    connector = 'kafka',
    topic = 'orders',
    format = 'json'
);

The SQL parser extracts the connector type and options from the WITH clause, resolves the connector from the registry, and calls from_options to create a Connection object. The resulting table is registered in the schema provider for subsequent queries in the same session.

Schema Provider Composition

The ArroyoSchemaProvider combines multiple sources of table definitions:

  • Pre-registered tables -- Connection tables loaded from the database before query planning
  • Inline definitions -- Tables defined via CREATE TABLE statements in the current SQL session
  • Views -- Virtual tables defined via CREATE VIEW statements
  • Memory tables -- Temporary tables defined without a connector

Theoretical Basis

SQL table integration implements the ContextProvider pattern from Apache DataFusion. The ContextProvider trait defines the interface that the SQL query planner uses to resolve names during planning:

  • get_table_source(name) -- Resolves a table reference to a TableSource containing the table's Arrow schema.
  • get_function_meta(name) -- Resolves scalar function references (UDFs).
  • get_aggregate_meta(name) -- Resolves aggregate function references (UDAFs).
  • get_window_meta(name) -- Resolves window function references.

Key theoretical concepts:

  • Name Resolution -- The ContextProvider pattern separates name resolution from query semantics. The planner asks "what is the schema of table X?" without knowing whether X is a Kafka topic, a Kinesis stream, or a view.
  • Schema Projection -- When the planner resolves a table, it receives an Arrow schema that defines the available columns and their types. This schema drives type checking, column resolution, and expression validation throughout the rest of query planning.
  • Two-Phase Table Creation -- Tables defined via CREATE TABLE go through two phases: (1) SQL parsing and connector resolution (handled by Table::try_from_statement), and (2) registration in the schema provider for use by subsequent statements. This two-phase approach allows later statements to reference tables defined by earlier statements.
  • Adapter Pattern -- The LogicalBatchInput struct wraps a table's name and Arrow schema into a DataFusion TableProvider, adapting Arroyo's internal table representation to DataFusion's expected interface.

Table Resolution Hierarchy

When resolving a table name, the schema provider follows this hierarchy:

  1. Check the registered tables map (case-insensitive via UniCase)
  2. If not found, return a DataFusionError::Plan error

This simple resolution strategy works because Arroyo does not support multi-schema or multi-catalog table references -- all tables exist in a single flat namespace.

Usage

SQL table integration is used whenever a SQL query references a table:

Referencing Pre-Registered Tables

-- Assumes 'orders' was created via REST API or previous SQL session
SELECT customer_id, COUNT(*) as order_count
FROM orders
GROUP BY customer_id, TUMBLE(event_time, INTERVAL '1' HOUR);

Inline Table Definition and Query

-- Define source
CREATE TABLE clicks (
    user_id BIGINT,
    page TEXT,
    ts TIMESTAMP,
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    connector = 'kafka',
    connection_profile = 'prod-kafka',
    topic = 'clicks',
    format = 'json'
);

-- Define sink
CREATE TABLE click_counts (
    user_id BIGINT,
    window_start TIMESTAMP,
    click_count BIGINT
) WITH (
    connector = 'kafka',
    connection_profile = 'prod-kafka',
    topic = 'click-counts',
    format = 'json'
);

-- Query referencing both tables
INSERT INTO click_counts
SELECT user_id, window_start, COUNT(*) as click_count
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '10' MINUTE);

Memory Tables and Views

-- Create a memory table (no connector)
CREATE TABLE state (
    key TEXT PRIMARY KEY,
    value BIGINT
);

-- Create a view
CREATE VIEW enriched_orders AS
SELECT o.*, c.name as customer_name
FROM orders o JOIN customers c ON o.customer_id = c.id;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment