Implementation:ArroyoSystems Arroyo Schema Resolver
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Schema, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Provides the SchemaResolver trait and its implementations for dynamically resolving serialization schemas at runtime, including a full client for the Confluent Schema Registry.
Description
This module defines the strategy pattern for schema resolution used by Arroyo's deserialization layer:
- SchemaResolver trait -- an async trait with a single method resolve_schema(id: u32) that returns an optional schema string given a numeric schema ID.
- FailingSchemaResolver -- returns an error for any schema request; used when schemas are embedded in messages and no registry is configured.
- FixedSchemaResolver -- returns a pre-loaded Avro schema for a single known ID; used when the schema is statically known.
- ConfluentSchemaRegistryClient -- a full HTTP client for the Confluent Schema Registry REST API with:
- Basic authentication support (API key/secret with Base64 encoding).
- Schema retrieval by subject/version and by global ID.
- Schema writing/registration with conflict and validation error handling.
- Reference resolution for schemas with dependencies (recursively fetches referenced schemas).
- Connection testing via the /subjects endpoint.
- ConfluentSchemaRegistry -- wraps ConfluentSchemaRegistryClient with a specific subject name, implementing SchemaResolver for runtime schema-by-ID resolution.
- Supporting types -- ConfluentSchemaType (Avro, Json, Protobuf), ConfluentSchemaReference, ConfluentSchemaSubjectResponse, ConfluentSchemaIdResponse.
Usage
Use ConfluentSchemaRegistry when connecting to Kafka topics that use Confluent Schema Registry for Avro/JSON/Protobuf schema evolution. Use FixedSchemaResolver for static schemas and FailingSchemaResolver as the default when no registry is configured.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-rpc/src/schema_resolver.rs
Signature
#[async_trait]
pub trait SchemaResolver: Send {
async fn resolve_schema(&self, id: u32) -> Result<Option<String>, String>;
}
pub struct FailingSchemaResolver {}
pub struct FixedSchemaResolver { id: u32, schema: String }
pub struct ConfluentSchemaRegistryClient { endpoint: Url, client: Client }
pub struct ConfluentSchemaRegistry { client: ConfluentSchemaRegistryClient, subject: String }
impl ConfluentSchemaRegistryClient {
pub fn new(endpoint: &str, api_key: Option<VarStr>, api_secret: Option<VarStr>) -> anyhow::Result<Self>;
pub async fn test(&self) -> anyhow::Result<()>;
}
impl ConfluentSchemaRegistry {
pub fn new(endpoint: &str, subject: &str, api_key: Option<VarStr>, api_secret: Option<VarStr>) -> anyhow::Result<Self>;
pub async fn get_schema_for_id(&self, id: u32) -> anyhow::Result<Option<ConfluentSchemaIdResponse>>;
pub async fn get_schema_for_version(&self, version: Option<u32>) -> anyhow::Result<Option<ConfluentSchemaSubjectResponse>>;
pub async fn write_schema(&self, schema: impl Into<String>, schema_type: ConfluentSchemaType) -> anyhow::Result<i32>;
pub async fn resolve_references(&self, references: &[ConfluentSchemaReference]) -> anyhow::Result<Vec<(String, ConfluentSchemaSubjectResponse)>>;
}
Import
use arroyo_rpc::schema_resolver::{
SchemaResolver, FailingSchemaResolver, FixedSchemaResolver,
ConfluentSchemaRegistry, ConfluentSchemaRegistryClient,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| endpoint | &str | Yes | Confluent Schema Registry base URL (must start with http:// or https://) |
| subject | &str | Yes | Schema Registry subject name (e.g., "my-topic-value") |
| api_key | Option<VarStr> | No | Authentication API key |
| api_secret | Option<VarStr> | No | Authentication API secret |
| id | u32 | Yes (for resolve) | Numeric schema ID embedded in Confluent wire format messages |
Outputs
| Name | Type | Description |
|---|---|---|
| Option<String> | Option<String> | Resolved schema as JSON/canonical string, or None if not found |
| ConfluentSchemaSubjectResponse | struct | Full schema response with id, schema, type, references, subject, version |
| i32 | i32 | Schema ID returned after registering a new schema |
Usage Examples
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistry, SchemaResolver};
// Create a schema registry client
let registry = ConfluentSchemaRegistry::new(
"https://schema-registry.example.com",
"my-topic-value",
Some(VarStr::new("api-key")),
Some(VarStr::new("api-secret")),
)?;
// Resolve a schema by ID (called during deserialization)
let schema_json = registry.resolve_schema(42).await?;
// Fetch the latest schema version for a subject
let latest = registry.get_schema_for_version(None).await?;