Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Schema Resolver

From Leeroopedia


Knowledge Sources
Domains Streaming, Schema, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

Provides the SchemaResolver trait and its implementations for dynamically resolving serialization schemas at runtime, including a full client for the Confluent Schema Registry.

Description

This module defines the strategy pattern for schema resolution used by Arroyo's deserialization layer:

  • SchemaResolver trait -- an async trait with a single method resolve_schema(id: u32) that returns an optional schema string given a numeric schema ID.
  • FailingSchemaResolver -- returns an error for any schema request; used when schemas are embedded in messages and no registry is configured.
  • FixedSchemaResolver -- returns a pre-loaded Avro schema for a single known ID; used when the schema is statically known.
  • ConfluentSchemaRegistryClient -- a full HTTP client for the Confluent Schema Registry REST API with:
    • Basic authentication support (API key/secret with Base64 encoding).
    • Schema retrieval by subject/version and by global ID.
    • Schema writing/registration with conflict and validation error handling.
    • Reference resolution for schemas with dependencies (recursively fetches referenced schemas).
    • Connection testing via the /subjects endpoint.
  • ConfluentSchemaRegistry -- wraps ConfluentSchemaRegistryClient with a specific subject name, implementing SchemaResolver for runtime schema-by-ID resolution.
  • Supporting types -- ConfluentSchemaType (Avro, Json, Protobuf), ConfluentSchemaReference, ConfluentSchemaSubjectResponse, ConfluentSchemaIdResponse.

Usage

Use ConfluentSchemaRegistry when connecting to Kafka topics that use Confluent Schema Registry for Avro/JSON/Protobuf schema evolution. Use FixedSchemaResolver for static schemas and FailingSchemaResolver as the default when no registry is configured.

Code Reference

Source Location

Signature

#[async_trait]
pub trait SchemaResolver: Send {
    async fn resolve_schema(&self, id: u32) -> Result<Option<String>, String>;
}

pub struct FailingSchemaResolver {}
pub struct FixedSchemaResolver { id: u32, schema: String }
pub struct ConfluentSchemaRegistryClient { endpoint: Url, client: Client }
pub struct ConfluentSchemaRegistry { client: ConfluentSchemaRegistryClient, subject: String }

impl ConfluentSchemaRegistryClient {
    pub fn new(endpoint: &str, api_key: Option<VarStr>, api_secret: Option<VarStr>) -> anyhow::Result<Self>;
    pub async fn test(&self) -> anyhow::Result<()>;
}

impl ConfluentSchemaRegistry {
    pub fn new(endpoint: &str, subject: &str, api_key: Option<VarStr>, api_secret: Option<VarStr>) -> anyhow::Result<Self>;
    pub async fn get_schema_for_id(&self, id: u32) -> anyhow::Result<Option<ConfluentSchemaIdResponse>>;
    pub async fn get_schema_for_version(&self, version: Option<u32>) -> anyhow::Result<Option<ConfluentSchemaSubjectResponse>>;
    pub async fn write_schema(&self, schema: impl Into<String>, schema_type: ConfluentSchemaType) -> anyhow::Result<i32>;
    pub async fn resolve_references(&self, references: &[ConfluentSchemaReference]) -> anyhow::Result<Vec<(String, ConfluentSchemaSubjectResponse)>>;
}

Import

use arroyo_rpc::schema_resolver::{
    SchemaResolver, FailingSchemaResolver, FixedSchemaResolver,
    ConfluentSchemaRegistry, ConfluentSchemaRegistryClient,
};

I/O Contract

Inputs

Name Type Required Description
endpoint &str Yes Confluent Schema Registry base URL (must start with http:// or https://)
subject &str Yes Schema Registry subject name (e.g., "my-topic-value")
api_key Option<VarStr> No Authentication API key
api_secret Option<VarStr> No Authentication API secret
id u32 Yes (for resolve) Numeric schema ID embedded in Confluent wire format messages

Outputs

Name Type Description
Option<String> Option<String> Resolved schema as JSON/canonical string, or None if not found
ConfluentSchemaSubjectResponse struct Full schema response with id, schema, type, references, subject, version
i32 i32 Schema ID returned after registering a new schema

Usage Examples

use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistry, SchemaResolver};

// Create a schema registry client
let registry = ConfluentSchemaRegistry::new(
    "https://schema-registry.example.com",
    "my-topic-value",
    Some(VarStr::new("api-key")),
    Some(VarStr::new("api-secret")),
)?;

// Resolve a schema by ID (called during deserialization)
let schema_json = registry.resolve_schema(42).await?;

// Fetch the latest schema version for a subject
let latest = registry.get_schema_for_version(None).await?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment