Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Format Serializer

From Leeroopedia


Overview

ArrowSerializer converts Arrow RecordBatch data into various output wire formats (JSON, Avro, raw strings, raw bytes) for sink connectors. It manages schema conversion and projection to exclude internal timestamp fields.

Description

The module provides:

  • record_batch_to_vec: A standalone function that serializes a RecordBatch into a vector of JSON-encoded byte vectors. It uses Arrow's JSON encoder with a custom ArroyoEncoderFactory for configurable timestamp and decimal formatting.
  • ArrowSerializer: The main serialization struct that:
    • Maintains lazy-initialized Avro and Kafka JSON schemas
    • Projects out the _timestamp field from output records
    • Dispatches serialization to format-specific logic:
      • JSON: Uses record_batch_to_vec with configurable timestamp format, decimal encoding, and debezium schema wrapping
      • Avro: Converts to Avro values using avro::ser::serialize, then encodes as Avro datum bytes (optionally with Confluent Schema Registry wire format header)
      • Raw String/Raw Bytes: Extracts the single value column directly
    • Supports Kafka Connect JSON schema wrapping via kafka_schema()

Key static methods:

  • avro_schema(): Generates an Avro schema from an Arrow schema
  • json_schema(): Generates a JSON schema from an Arrow schema
  • kafka_schema(): Generates a Kafka Connect JSON schema

Usage

ArrowSerializer is instantiated per sink operator with the target format. The serialize method is called on each record batch to produce output bytes.

Code Reference

Source Location

crates/arroyo-formats/src/ser.rs

Signature

pub fn record_batch_to_vec(
    batch: &RecordBatch,
    explicit_nulls: bool,
    timestamp_format: TimestampFormat,
    decimal_encoding: DecimalEncoding,
) -> Result<Vec<Vec<u8>>, ArrowError>

pub struct ArrowSerializer {
    kafka_schema: Option<Value>,
    avro_schema: Option<Arc<apache_avro::schema::Schema>>,
    format: Format,
    projection: Vec<usize>,
}

impl ArrowSerializer {
    pub fn new(format: Format) -> Self
    pub fn avro_schema(schema: &arrow_schema::Schema) -> apache_avro::Schema
    pub fn json_schema(schema: &arrow_schema::Schema) -> Value
    pub fn kafka_schema(schema: &arrow_schema::Schema) -> Value
}

Import

use arroyo_formats::ser::{ArrowSerializer, record_batch_to_vec};

I/O Contract

Inputs

Name Type Description
batch &RecordBatch Arrow record batch to serialize
format Format Target serialization format (JSON, Avro, RawString, RawBytes)

Outputs

Name Type Description
encoded rows Vec<Vec<u8>> Serialized byte vectors, one per row

Usage Examples

let mut serializer = ArrowSerializer::new(Format::Json(JsonFormat::default()));

// Serialize a record batch
let encoded_rows = serializer.serialize(&record_batch);
for row_bytes in encoded_rows {
    // write to sink
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment