Implementation:ArroyoSystems Arroyo Expand Schema
Overview
The Expand Schema implementation provides the core schema conversion logic in Arroyo. The expand_schema function dispatches to format-specific handlers -- expand_avro_schema, expand_proto_schema, and expand_json_schema -- each of which converts an external schema definition into a list of SourceField objects backed by Apache Arrow types. This function is called during connection table creation to populate the table's column definitions from the declared data format.
Code Reference
| File | Lines | Purpose |
|---|---|---|
crates/arroyo-api/src/connection_tables.rs |
L425-L473 | Main dispatcher expand_schema
|
crates/arroyo-api/src/connection_tables.rs |
L475-L546 | expand_avro_schema -- Avro schema conversion
|
crates/arroyo-api/src/connection_tables.rs |
L548-L625 | expand_proto_schema -- Protobuf schema conversion
|
crates/arroyo-api/src/connection_tables.rs |
L670-L731 | expand_json_schema -- JSON Schema conversion
|
crates/arroyo-api/src/connection_tables.rs |
L627-L668 | expand_local_proto_schema -- Protobuf descriptor compilation
|
Signature
pub(crate) async fn expand_schema(
name: &str,
connector: &str,
connection_type: ConnectionType,
schema: ConnectionSchema,
profile_config: &Value,
table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>
Format-Specific Handlers
async fn expand_avro_schema(
connector: &str,
connection_type: ConnectionType,
mut schema: ConnectionSchema,
profile_config: &Value,
table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>
async fn expand_proto_schema(
connector: &str,
connection_type: ConnectionType,
mut schema: ConnectionSchema,
profile_config: &Value,
table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>
async fn expand_json_schema(
name: &str,
connector: &str,
connection_type: ConnectionType,
mut schema: ConnectionSchema,
profile_config: &Value,
table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>
Description
Main Dispatcher: expand_schema
The expand_schema function inspects the format field of the incoming ConnectionSchema and dispatches to the appropriate handler:
Format::Json(_)-- dispatches toexpand_json_schemaFormat::Avro(_)-- dispatches toexpand_avro_schemaFormat::Protobuf(_)-- dispatches toexpand_proto_schemaFormat::Parquet(_),Format::RawString(_),Format::RawBytes(_)-- returned unchanged (these formats either have self-describing schemas or no schema at all)
If the schema has no format specified, it is returned as-is.
Avro Schema Expansion
The expand_avro_schema handler performs the following steps:
- Schema Registry fetch -- If
confluent_schema_registry: trueis set on the Avro format, the handler fetches the schema from Confluent Schema Registry via theget_schemahelper. It validates that the returned schema type is Avro. - Schema definition extraction -- Extracts the Avro schema string from the
SchemaDefinition::AvroSchemavariant. For sources, a schema definition is required. For sinks, the schema may be inferred (marked withinferred: true). - Reader schema configuration -- Parses the Avro schema string and adds it as a reader schema to the
AvroFormatconfiguration, enabling compatible schema evolution. - Arrow conversion -- Calls
avro::schema::to_arrow(definition)to convert the Avro schema into an ArrowSchema, then maps each ArrowFieldinto aSourceField.
Protobuf Schema Expansion
The expand_proto_schema handler performs the following steps:
- Schema Registry fetch -- If
confluent_schema_registry: trueis set, fetches the Protobuf schema (and its dependencies) from Schema Registry. - Schema definition extraction -- Extracts the Protobuf schema string and dependency map from
SchemaDefinition::ProtobufSchema. - Descriptor compilation -- Calls
schema_file_to_descriptorto compile the.protosource into a binary file descriptor set, resolving imports from the dependencies map. - Message resolution -- Resolves the specified message name within the compiled descriptor pool. If the message is not found, returns an error listing all available message names.
- Arrow conversion -- Calls
protobuf_to_arrow(&descriptor)to convert the Protobuf message descriptor to an Arrow schema, then maps fields toSourceFieldobjects. - Compiled schema storage -- Stores the compiled binary descriptor in the format configuration for efficient runtime deserialization.
JSON Schema Expansion
The expand_json_schema handler performs the following steps:
- Schema Registry fetch -- If
confluent_schema_registry: trueis set on the JSON format, fetches the schema from Schema Registry and records the schema version ID. - Arrow conversion -- Calls
json::schema::to_arrow(name, schema)to convert the JSON Schema definition to an Arrow schema. - Field mapping -- Maps each Arrow field to a
SourceField.
Schema Registry Integration
The get_schema helper function (L733-L802) handles Confluent Schema Registry interaction:
- Parses the profile configuration to extract Schema Registry endpoint and credentials (supports both
kafkaandconfluentconnector types) - Constructs a
ConfluentSchemaRegistryresolver - Fetches the latest schema version for the topic's value subject
- Resolves any schema references (important for Protobuf imports)
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input | ConnectionSchema |
Schema object with a format (Avro/Protobuf/JSON/etc.) and optional definition (the raw schema string or reference)
|
| Input | name: &str |
Table name (used for JSON Schema type naming) |
| Input | connector: &str |
Connector type name (used to determine Schema Registry configuration) |
| Input | connection_type: ConnectionType |
Source, Sink, or Lookup (affects whether schema is required vs. inferred) |
| Input | profile_config: &Value |
Connection profile JSON (contains Schema Registry credentials) |
| Input | table_config: &Value |
Table configuration JSON (contains topic name for Schema Registry subject) |
| Output | ConnectionSchema |
Schema with populated fields: Vec<SourceField> containing Arrow-typed column definitions
|
Each SourceField in the output includes:
name-- Column namefield_type-- Arrow-compatible field type (mapped to SQL type)required-- Whether the field is non-nullablesql_name-- The SQL type name for display
Usage Examples
Avro Schema Expansion
let schema = ConnectionSchema {
format: Some(Format::Avro(AvroFormat {
confluent_schema_registry: false,
..Default::default()
})),
definition: Some(SchemaDefinition::AvroSchema {
schema: r#"{"type":"record","name":"Event","fields":[
{"name":"id","type":"long"},
{"name":"name","type":"string"}
]}"#.to_string(),
}),
fields: vec![],
..Default::default()
};
let expanded = expand_schema(
"events", "kafka", ConnectionType::Source,
schema, &profile_config, &table_config
).await?;
// expanded.fields now contains:
// [SourceField { name: "id", field_type: Int64 },
// SourceField { name: "name", field_type: Utf8 }]
Protobuf Schema Expansion
let schema = ConnectionSchema {
format: Some(Format::Protobuf(ProtobufFormat {
message_name: Some("com.example.Event".to_string()),
confluent_schema_registry: false,
..Default::default()
})),
definition: Some(SchemaDefinition::ProtobufSchema {
schema: "syntax = \"proto3\";\nmessage Event { int64 id = 1; string name = 2; }".to_string(),
dependencies: HashMap::new(),
}),
fields: vec![],
..Default::default()
};
let expanded = expand_schema(
"events", "kafka", ConnectionType::Source,
schema, &profile_config, &table_config
).await?;