Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Expand Schema

From Leeroopedia


Template:Implementation

Overview

The Expand Schema implementation provides the core schema conversion logic in Arroyo. The expand_schema function dispatches to format-specific handlers -- expand_avro_schema, expand_proto_schema, and expand_json_schema -- each of which converts an external schema definition into a list of SourceField objects backed by Apache Arrow types. This function is called during connection table creation to populate the table's column definitions from the declared data format.

Code Reference

File Lines Purpose
crates/arroyo-api/src/connection_tables.rs L425-L473 Main dispatcher expand_schema
crates/arroyo-api/src/connection_tables.rs L475-L546 expand_avro_schema -- Avro schema conversion
crates/arroyo-api/src/connection_tables.rs L548-L625 expand_proto_schema -- Protobuf schema conversion
crates/arroyo-api/src/connection_tables.rs L670-L731 expand_json_schema -- JSON Schema conversion
crates/arroyo-api/src/connection_tables.rs L627-L668 expand_local_proto_schema -- Protobuf descriptor compilation

Signature

pub(crate) async fn expand_schema(
    name: &str,
    connector: &str,
    connection_type: ConnectionType,
    schema: ConnectionSchema,
    profile_config: &Value,
    table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>

Format-Specific Handlers

async fn expand_avro_schema(
    connector: &str,
    connection_type: ConnectionType,
    mut schema: ConnectionSchema,
    profile_config: &Value,
    table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>

async fn expand_proto_schema(
    connector: &str,
    connection_type: ConnectionType,
    mut schema: ConnectionSchema,
    profile_config: &Value,
    table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>

async fn expand_json_schema(
    name: &str,
    connector: &str,
    connection_type: ConnectionType,
    mut schema: ConnectionSchema,
    profile_config: &Value,
    table_config: &Value,
) -> Result<ConnectionSchema, ErrorResp>

Description

Main Dispatcher: expand_schema

The expand_schema function inspects the format field of the incoming ConnectionSchema and dispatches to the appropriate handler:

  • Format::Json(_) -- dispatches to expand_json_schema
  • Format::Avro(_) -- dispatches to expand_avro_schema
  • Format::Protobuf(_) -- dispatches to expand_proto_schema
  • Format::Parquet(_), Format::RawString(_), Format::RawBytes(_) -- returned unchanged (these formats either have self-describing schemas or no schema at all)

If the schema has no format specified, it is returned as-is.

Avro Schema Expansion

The expand_avro_schema handler performs the following steps:

  1. Schema Registry fetch -- If confluent_schema_registry: true is set on the Avro format, the handler fetches the schema from Confluent Schema Registry via the get_schema helper. It validates that the returned schema type is Avro.
  2. Schema definition extraction -- Extracts the Avro schema string from the SchemaDefinition::AvroSchema variant. For sources, a schema definition is required. For sinks, the schema may be inferred (marked with inferred: true).
  3. Reader schema configuration -- Parses the Avro schema string and adds it as a reader schema to the AvroFormat configuration, enabling compatible schema evolution.
  4. Arrow conversion -- Calls avro::schema::to_arrow(definition) to convert the Avro schema into an Arrow Schema, then maps each Arrow Field into a SourceField.

Protobuf Schema Expansion

The expand_proto_schema handler performs the following steps:

  1. Schema Registry fetch -- If confluent_schema_registry: true is set, fetches the Protobuf schema (and its dependencies) from Schema Registry.
  2. Schema definition extraction -- Extracts the Protobuf schema string and dependency map from SchemaDefinition::ProtobufSchema.
  3. Descriptor compilation -- Calls schema_file_to_descriptor to compile the .proto source into a binary file descriptor set, resolving imports from the dependencies map.
  4. Message resolution -- Resolves the specified message name within the compiled descriptor pool. If the message is not found, returns an error listing all available message names.
  5. Arrow conversion -- Calls protobuf_to_arrow(&descriptor) to convert the Protobuf message descriptor to an Arrow schema, then maps fields to SourceField objects.
  6. Compiled schema storage -- Stores the compiled binary descriptor in the format configuration for efficient runtime deserialization.

JSON Schema Expansion

The expand_json_schema handler performs the following steps:

  1. Schema Registry fetch -- If confluent_schema_registry: true is set on the JSON format, fetches the schema from Schema Registry and records the schema version ID.
  2. Arrow conversion -- Calls json::schema::to_arrow(name, schema) to convert the JSON Schema definition to an Arrow schema.
  3. Field mapping -- Maps each Arrow field to a SourceField.

Schema Registry Integration

The get_schema helper function (L733-L802) handles Confluent Schema Registry interaction:

  1. Parses the profile configuration to extract Schema Registry endpoint and credentials (supports both kafka and confluent connector types)
  2. Constructs a ConfluentSchemaRegistry resolver
  3. Fetches the latest schema version for the topic's value subject
  4. Resolves any schema references (important for Protobuf imports)

I/O Contract

Direction Type Description
Input ConnectionSchema Schema object with a format (Avro/Protobuf/JSON/etc.) and optional definition (the raw schema string or reference)
Input name: &str Table name (used for JSON Schema type naming)
Input connector: &str Connector type name (used to determine Schema Registry configuration)
Input connection_type: ConnectionType Source, Sink, or Lookup (affects whether schema is required vs. inferred)
Input profile_config: &Value Connection profile JSON (contains Schema Registry credentials)
Input table_config: &Value Table configuration JSON (contains topic name for Schema Registry subject)
Output ConnectionSchema Schema with populated fields: Vec<SourceField> containing Arrow-typed column definitions

Each SourceField in the output includes:

  • name -- Column name
  • field_type -- Arrow-compatible field type (mapped to SQL type)
  • required -- Whether the field is non-nullable
  • sql_name -- The SQL type name for display

Usage Examples

Avro Schema Expansion

let schema = ConnectionSchema {
    format: Some(Format::Avro(AvroFormat {
        confluent_schema_registry: false,
        ..Default::default()
    })),
    definition: Some(SchemaDefinition::AvroSchema {
        schema: r#"{"type":"record","name":"Event","fields":[
            {"name":"id","type":"long"},
            {"name":"name","type":"string"}
        ]}"#.to_string(),
    }),
    fields: vec![],
    ..Default::default()
};

let expanded = expand_schema(
    "events", "kafka", ConnectionType::Source,
    schema, &profile_config, &table_config
).await?;

// expanded.fields now contains:
// [SourceField { name: "id", field_type: Int64 },
//  SourceField { name: "name", field_type: Utf8 }]

Protobuf Schema Expansion

let schema = ConnectionSchema {
    format: Some(Format::Protobuf(ProtobufFormat {
        message_name: Some("com.example.Event".to_string()),
        confluent_schema_registry: false,
        ..Default::default()
    })),
    definition: Some(SchemaDefinition::ProtobufSchema {
        schema: "syntax = \"proto3\";\nmessage Event { int64 id = 1; string name = 2; }".to_string(),
        dependencies: HashMap::new(),
    }),
    fields: vec![],
    ..Default::default()
};

let expanded = expand_schema(
    "events", "kafka", ConnectionType::Source,
    schema, &profile_config, &table_config
).await?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment