Principle:ArroyoSystems Arroyo Schema Definition
Overview
The Schema Definition principle governs how Arroyo converts external schema formats -- such as Apache Avro, Protocol Buffers (Protobuf), and JSON Schema -- into a unified internal schema representation based on Apache Arrow. Schema definition bridges external data formats with the internal columnar processing model, ensuring that data entering or leaving the system conforms to a well-defined structure.
Description
Stream processing engines must handle data from diverse external systems, each with its own schema definition language. Kafka topics may use Avro schemas registered in Confluent Schema Registry, gRPC services use Protobuf definitions, and REST APIs often describe their payloads with JSON Schema. To process data from all these sources uniformly, Arroyo converts format-specific schemas into a single canonical representation.
Apache Arrow's type system serves as the internal schema standard for several reasons:
- Columnar representation -- Arrow schemas map directly to the columnar record batches used for in-memory data processing, avoiding conversion overhead at runtime.
- Rich type system -- Arrow supports primitive types, nested structs, lists, maps, timestamps, and other complex types needed to represent real-world data.
- Ecosystem compatibility -- Arrow is the standard for DataFusion (the SQL engine Arroyo uses for query planning), enabling seamless integration between schema definitions and query execution.
The schema conversion process involves three key concepts:
Type Mapping
Each format's type system must be mapped to Arrow types:
- Avro to Arrow -- Avro unions map to Arrow nullable types or union types. Avro records map to Arrow structs. Avro logical types (e.g.,
timestamp-millis) map to corresponding Arrow temporal types. - Protobuf to Arrow -- Protobuf messages map to Arrow structs. Protobuf repeated fields map to Arrow lists. Protobuf
oneoffields require special handling. - JSON Schema to Arrow -- JSON Schema object types map to Arrow structs. JSON Schema arrays map to Arrow lists. JSON Schema primitive types (
string,number,integer,boolean) map to corresponding Arrow primitives.
Schema Registry Integration
For Kafka-based connectors (including Confluent Cloud), schemas can be fetched automatically from Confluent Schema Registry rather than requiring users to provide them manually. This integration:
- Retrieves the latest schema version for a given subject (typically derived from the topic name)
- Resolves schema references (Protobuf imports, Avro references)
- Validates that the registered schema type matches the configured format
Schema Evolution Handling
The schema definition layer handles cases where the schema may evolve over time:
- Source schemas -- For sources, the schema is fetched and expanded at table creation time. The Arrow schema derived from the external definition becomes the table's fixed schema.
- Sink schemas -- For sinks, the schema may be inferred from the query output rather than defined externally.
- Reader schemas -- For Avro, the system supports reader schemas that allow reading data written with a compatible but different writer schema.
Theoretical Basis
Schema unification is a fundamental technique in data integration systems. The core insight is that heterogeneous format-specific schemas can be converted into a canonical representation without loss of essential type information.
Key theoretical concepts:
- Type System Homomorphism -- The conversion from each format's type system to Arrow must preserve essential structural properties (nullability, nesting depth, cardinality). This is a type-theoretic mapping that must be both sound (no invalid types produced) and reasonably complete (most source types can be represented).
- Schema Registry as Shared State -- Confluent Schema Registry implements the Shared Kernel pattern from Domain-Driven Design -- producers and consumers agree on a shared schema definition managed by an external service, decoupling them from direct schema negotiation.
- Schema Evolution -- The principle of schema compatibility (as defined by Avro's compatibility rules: backward, forward, full) determines whether a new schema version can safely replace an older one. Arroyo leverages this by supporting Avro reader schemas that handle compatible schema changes transparently.
Usage
Schema definition is applied whenever a user creates a connection table with a structured data format:
- Avro sources -- The system fetches the Avro schema from Schema Registry (if configured) or accepts a user-provided schema, then converts it to Arrow fields.
- Protobuf sources -- The system accepts a
.protofile definition, compiles it to a file descriptor, resolves the specified message type, and converts it to Arrow fields. - JSON Schema sources -- The system accepts a JSON Schema definition and converts it to Arrow fields.
- Format-agnostic sinks -- For sinks, the schema may be inferred from the query output, with the Arrow schema serving as the authoritative definition.
Example: Avro Schema Conversion
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "customer_id", "type": "long"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
This Avro schema converts to the following Arrow schema:
| Field Name | Arrow Type |
|---|---|
order_id |
Int64
|
customer_id |
Int64
|
amount |
Float64
|
timestamp |
Timestamp(Millisecond, None)
|