Implementation:ArroyoSystems Arroyo Schema Provider
Overview
The Schema Provider implementation provides the bridge between Arroyo's connector system and the DataFusion SQL query planner. The ArroyoSchemaProvider struct implements DataFusion's ContextProvider trait, enabling the planner to resolve table names to their Arrow schemas during query planning. The Table::try_from_statement method parses SQL CREATE TABLE and CREATE VIEW statements to register new tables in the schema provider.
Code Reference
| File | Lines | Purpose |
|---|---|---|
crates/arroyo-planner/src/lib.rs |
L111-L127 | ArroyoSchemaProvider struct definition
|
crates/arroyo-planner/src/lib.rs |
L426-L475 | impl ContextProvider for ArroyoSchemaProvider
|
crates/arroyo-planner/src/lib.rs |
L419-L424 | create_table helper (wraps schema in LogicalBatchInput)
|
crates/arroyo-planner/src/tables.rs |
L758-L905 | Table::try_from_statement
|
Signatures
#[derive(Clone, Default)]
pub struct ArroyoSchemaProvider {
pub source_defs: HashMap<String, String>,
tables: HashMap<UniCase<String>, Table>,
pub functions: HashMap<String, Arc<ScalarUDF>>,
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
pub window_functions: HashMap<String, Arc<WindowUDF>>,
pub connections: HashMap<String, Connection>,
profiles: HashMap<String, ConnectionProfile>,
pub udf_defs: HashMap<String, UdfDef>,
config_options: datafusion::config::ConfigOptions,
pub dylib_udfs: HashMap<String, DylibUdfConfig>,
pub python_udfs: HashMap<String, PythonUdfConfig>,
pub expr_planners: Vec<Arc<dyn ExprPlanner>>,
pub planning_options: PlanningOptions,
pub analyzer: Analyzer,
}
impl ContextProvider for ArroyoSchemaProvider {
fn get_table_source(
&self,
name: TableReference,
) -> datafusion::common::Result<Arc<dyn TableSource>>
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>
fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>>
fn options(&self) -> &datafusion::config::ConfigOptions
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType>
fn udf_names(&self) -> Vec<String>
fn udaf_names(&self) -> Vec<String>
fn udwf_names(&self) -> Vec<String>
fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>]
}
impl Table {
pub fn try_from_statement(
statement: &Statement,
schema_provider: &ArroyoSchemaProvider,
) -> Result<Option<Self>>
}
Description
ArroyoSchemaProvider Struct
The ArroyoSchemaProvider is the central metadata repository for SQL query planning. Its fields are:
source_defs: HashMap<String, String>-- Raw SQL source definitions for UDFs, used for recompilation.tables: HashMap<UniCase<String>, Table>-- Registered tables, keyed by case-insensitive name. TheTableenum includesConnectorTable,MemoryTable,TableFromQuery(views), andLookupTablevariants.functions: HashMap<String, Arc<ScalarUDF>>-- Registered scalar User-Defined Functions.aggregate_functions: HashMap<String, Arc<AggregateUDF>>-- Registered aggregate UDFs.window_functions: HashMap<String, Arc<WindowUDF>>-- Registered window UDFs.connections: HashMap<String, Connection>-- Named connections for connector tables.profiles: HashMap<String, ConnectionProfile>-- Named connection profiles available for SQLCREATE TABLEstatements that reference profiles by name.udf_defs: HashMap<String, UdfDef>-- Parsed UDF definitions for code generation.dylib_udfs: HashMap<String, DylibUdfConfig>-- Dynamic library UDF configurations.python_udfs: HashMap<String, PythonUdfConfig>-- Python UDF configurations.planning_options: PlanningOptions-- Configuration options for the planning phase (e.g., TTL).analyzer: Analyzer-- The DataFusion analyzer instance for plan optimization.
ContextProvider Implementation
The ContextProvider trait implementation is the core interface that DataFusion's SQL planner uses during query planning:
get_table_source
This is the primary method for table resolution:
fn get_table_source(
&self,
name: TableReference,
) -> datafusion::common::Result<Arc<dyn TableSource>> {
let table = self
.get_table(name.to_string())
.ok_or_else(|| DataFusionError::Plan(format!("Table {name} not found")))?;
let fields = table.get_fields();
let schema = Arc::new(Schema::new_with_metadata(fields, HashMap::new()));
Ok(create_table(name.to_string(), schema))
}
The method:
- Looks up the table by name using
get_table(case-insensitive viaUniCase) - Extracts the table's Arrow fields via
table.get_fields() - Constructs an Arrow
Schemafrom the fields - Wraps it in a
LogicalBatchInput(a DataFusionTableProvider) via thecreate_tablehelper
The create_table helper function constructs the chain: LogicalBatchInput (with table name and schema) wrapped in DefaultTableSource wrapped in Arc<dyn TableSource>.
Other ContextProvider Methods
get_function_meta-- Looks up scalar UDFs by name from thefunctionsmap.get_aggregate_meta-- Looks up aggregate UDFs fromaggregate_functions.get_window_meta-- Looks up window functions fromwindow_functions.options-- Returns the DataFusion config options.get_expr_planners-- Returns custom expression planners.
Table::try_from_statement
This method (L758-L905 in tables.rs) handles parsing SQL DDL statements into Table objects. It processes two categories of statements:
CREATE TABLE Statements
For CREATE TABLE statements (with no AS query):
- Extract components -- Parses the table name, columns,
WITHoptions, constraints, and temporary flag from the SQL AST. - Build ConnectorOptions -- Converts
WITHoptions into aConnectorOptionsmap. - Extract connector type -- Pulls the
connectoroption from the options map. - Schema construction -- Calls
Self::schema_from_columnsto build typed field definitions from the column list. - Primary key detection -- Identifies columns marked with
PRIMARY KEYconstraints. - Branch on connector type:
- Memory table (connector is
"memory"or unspecified) -- Creates aTable::MemoryTablewith the column fields. Validates that no virtual fields orWITHoptions are specified. - Connector table -- Resolves the connection profile (if specified via
connection_profileoption), extracts watermark constraints, and callsConnectorTable::from_optionsto create the table via the connector'sfrom_optionstrait method. The result is wrapped inTable::ConnectorTableorTable::LookupTabledepending on the connection type.
- Memory table (connector is
CREATE VIEW / CREATE TABLE AS Statements
For statements that produce a query-backed table:
- Optimizes the query plan via
produce_optimized_plan - Wraps the optimized logical plan in a
RemoteTableExtension - Returns a
Table::TableFromQuerywith the name and logical plan
I/O Contract
| Component | Input | Output |
|---|---|---|
get_table_source |
TableReference (SQL table name, possibly qualified) |
Arc<dyn TableSource> wrapping a LogicalBatchInput with the table's Arrow schema
|
try_from_statement |
&Statement (SQL AST node) and &ArroyoSchemaProvider (for profile/function resolution) |
Option -- |