Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Schema Provider

From Leeroopedia


Template:Implementation

Overview

The Schema Provider implementation provides the bridge between Arroyo's connector system and the DataFusion SQL query planner. The ArroyoSchemaProvider struct implements DataFusion's ContextProvider trait, enabling the planner to resolve table names to their Arrow schemas during query planning. The Table::try_from_statement method parses SQL CREATE TABLE and CREATE VIEW statements to register new tables in the schema provider.

Code Reference

File Lines Purpose
crates/arroyo-planner/src/lib.rs L111-L127 ArroyoSchemaProvider struct definition
crates/arroyo-planner/src/lib.rs L426-L475 impl ContextProvider for ArroyoSchemaProvider
crates/arroyo-planner/src/lib.rs L419-L424 create_table helper (wraps schema in LogicalBatchInput)
crates/arroyo-planner/src/tables.rs L758-L905 Table::try_from_statement

Signatures

#[derive(Clone, Default)]
pub struct ArroyoSchemaProvider {
    pub source_defs: HashMap<String, String>,
    tables: HashMap<UniCase<String>, Table>,
    pub functions: HashMap<String, Arc<ScalarUDF>>,
    pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
    pub window_functions: HashMap<String, Arc<WindowUDF>>,
    pub connections: HashMap<String, Connection>,
    profiles: HashMap<String, ConnectionProfile>,
    pub udf_defs: HashMap<String, UdfDef>,
    config_options: datafusion::config::ConfigOptions,
    pub dylib_udfs: HashMap<String, DylibUdfConfig>,
    pub python_udfs: HashMap<String, PythonUdfConfig>,
    pub expr_planners: Vec<Arc<dyn ExprPlanner>>,
    pub planning_options: PlanningOptions,
    pub analyzer: Analyzer,
}

impl ContextProvider for ArroyoSchemaProvider {
    fn get_table_source(
        &self,
        name: TableReference,
    ) -> datafusion::common::Result<Arc<dyn TableSource>>

    fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>

    fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>

    fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>>

    fn options(&self) -> &datafusion::config::ConfigOptions

    fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType>

    fn udf_names(&self) -> Vec<String>
    fn udaf_names(&self) -> Vec<String>
    fn udwf_names(&self) -> Vec<String>

    fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>]
}

impl Table {
    pub fn try_from_statement(
        statement: &Statement,
        schema_provider: &ArroyoSchemaProvider,
    ) -> Result<Option<Self>>
}

Description

ArroyoSchemaProvider Struct

The ArroyoSchemaProvider is the central metadata repository for SQL query planning. Its fields are:

  • source_defs: HashMap<String, String> -- Raw SQL source definitions for UDFs, used for recompilation.
  • tables: HashMap<UniCase<String>, Table> -- Registered tables, keyed by case-insensitive name. The Table enum includes ConnectorTable, MemoryTable, TableFromQuery (views), and LookupTable variants.
  • functions: HashMap<String, Arc<ScalarUDF>> -- Registered scalar User-Defined Functions.
  • aggregate_functions: HashMap<String, Arc<AggregateUDF>> -- Registered aggregate UDFs.
  • window_functions: HashMap<String, Arc<WindowUDF>> -- Registered window UDFs.
  • connections: HashMap<String, Connection> -- Named connections for connector tables.
  • profiles: HashMap<String, ConnectionProfile> -- Named connection profiles available for SQL CREATE TABLE statements that reference profiles by name.
  • udf_defs: HashMap<String, UdfDef> -- Parsed UDF definitions for code generation.
  • dylib_udfs: HashMap<String, DylibUdfConfig> -- Dynamic library UDF configurations.
  • python_udfs: HashMap<String, PythonUdfConfig> -- Python UDF configurations.
  • planning_options: PlanningOptions -- Configuration options for the planning phase (e.g., TTL).
  • analyzer: Analyzer -- The DataFusion analyzer instance for plan optimization.

ContextProvider Implementation

The ContextProvider trait implementation is the core interface that DataFusion's SQL planner uses during query planning:

get_table_source

This is the primary method for table resolution:

fn get_table_source(
    &self,
    name: TableReference,
) -> datafusion::common::Result<Arc<dyn TableSource>> {
    let table = self
        .get_table(name.to_string())
        .ok_or_else(|| DataFusionError::Plan(format!("Table {name} not found")))?;

    let fields = table.get_fields();
    let schema = Arc::new(Schema::new_with_metadata(fields, HashMap::new()));
    Ok(create_table(name.to_string(), schema))
}

The method:

  1. Looks up the table by name using get_table (case-insensitive via UniCase)
  2. Extracts the table's Arrow fields via table.get_fields()
  3. Constructs an Arrow Schema from the fields
  4. Wraps it in a LogicalBatchInput (a DataFusion TableProvider) via the create_table helper

The create_table helper function constructs the chain: LogicalBatchInput (with table name and schema) wrapped in DefaultTableSource wrapped in Arc<dyn TableSource>.

Other ContextProvider Methods

  • get_function_meta -- Looks up scalar UDFs by name from the functions map.
  • get_aggregate_meta -- Looks up aggregate UDFs from aggregate_functions.
  • get_window_meta -- Looks up window functions from window_functions.
  • options -- Returns the DataFusion config options.
  • get_expr_planners -- Returns custom expression planners.

Table::try_from_statement

This method (L758-L905 in tables.rs) handles parsing SQL DDL statements into Table objects. It processes two categories of statements:

CREATE TABLE Statements

For CREATE TABLE statements (with no AS query):

  1. Extract components -- Parses the table name, columns, WITH options, constraints, and temporary flag from the SQL AST.
  2. Build ConnectorOptions -- Converts WITH options into a ConnectorOptions map.
  3. Extract connector type -- Pulls the connector option from the options map.
  4. Schema construction -- Calls Self::schema_from_columns to build typed field definitions from the column list.
  5. Primary key detection -- Identifies columns marked with PRIMARY KEY constraints.
  6. Branch on connector type:
    • Memory table (connector is "memory" or unspecified) -- Creates a Table::MemoryTable with the column fields. Validates that no virtual fields or WITH options are specified.
    • Connector table -- Resolves the connection profile (if specified via connection_profile option), extracts watermark constraints, and calls ConnectorTable::from_options to create the table via the connector's from_options trait method. The result is wrapped in Table::ConnectorTable or Table::LookupTable depending on the connection type.

CREATE VIEW / CREATE TABLE AS Statements

For statements that produce a query-backed table:

  1. Optimizes the query plan via produce_optimized_plan
  2. Wraps the optimized logical plan in a RemoteTableExtension
  3. Returns a Table::TableFromQuery with the name and logical plan

I/O Contract

Component Input Output
get_table_source TableReference (SQL table name, possibly qualified) Arc<dyn TableSource> wrapping a LogicalBatchInput with the table's Arrow schema
try_from_statement &Statement (SQL AST node) and &ArroyoSchemaProvider (for profile/function resolution) Option -- Some(Table::ConnectorTable(...)), Some(Table::MemoryTable(...)), Some(Table::TableFromQuery(...)), or None if the statement is not a table definition

Usage Examples

Table Resolution During Query Planning

use arroyo_planner::ArroyoSchemaProvider;
use datafusion::sql::planner::ContextProvider;
use datafusion::sql::TableReference;

let provider = ArroyoSchemaProvider::default();
// ... register tables, functions, profiles ...

// During query planning, DataFusion calls:
let table_source = provider.get_table_source(
    TableReference::bare("orders")
)?;

// table_source.schema() returns the Arrow schema:
// Schema { fields: [order_id: Int64, amount: Float64, event_time: Timestamp] }

Parsing a CREATE TABLE Statement

use arroyo_planner::tables::Table;
use sqlparser::parser::Parser;
use sqlparser::dialect::ArroyoDialect;

let sql = "CREATE TABLE orders (
    order_id BIGINT,
    amount DOUBLE
) WITH (
    connector = 'kafka',
    topic = 'orders',
    format = 'json'
)";

let dialect = ArroyoDialect {};
let statements = Parser::parse_sql(&dialect, sql)?;

let table = Table::try_from_statement(&statements[0], &schema_provider)?;
// table is Some(Table::ConnectorTable(...)) with kafka connector

Full SQL Session Flow

-- Statement 1: parsed by try_from_statement, registered in provider
CREATE TABLE clicks (
    user_id BIGINT,
    url TEXT,
    ts TIMESTAMP,
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    connector = 'kafka',
    topic = 'clicks',
    format = 'json'
);

-- Statement 2: resolved by get_table_source('clicks')
SELECT user_id, COUNT(*) as cnt
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '1' MINUTE);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment