Implementation:ArroyoSystems Arroyo Format Serializer
Appearance
Overview
ArrowSerializer converts Arrow RecordBatch data into various output wire formats (JSON, Avro, raw strings, raw bytes) for sink connectors. It manages schema conversion and projection to exclude internal timestamp fields.
Description
The module provides:
record_batch_to_vec: A standalone function that serializes aRecordBatchinto a vector of JSON-encoded byte vectors. It uses Arrow's JSON encoder with a customArroyoEncoderFactoryfor configurable timestamp and decimal formatting.
ArrowSerializer: The main serialization struct that:- Maintains lazy-initialized Avro and Kafka JSON schemas
- Projects out the
_timestampfield from output records - Dispatches serialization to format-specific logic:
- JSON: Uses
record_batch_to_vecwith configurable timestamp format, decimal encoding, and debezium schema wrapping - Avro: Converts to Avro values using
avro::ser::serialize, then encodes as Avro datum bytes (optionally with Confluent Schema Registry wire format header) - Raw String/Raw Bytes: Extracts the single value column directly
- JSON: Uses
- Supports Kafka Connect JSON schema wrapping via
kafka_schema()
Key static methods:
avro_schema(): Generates an Avro schema from an Arrow schemajson_schema(): Generates a JSON schema from an Arrow schemakafka_schema(): Generates a Kafka Connect JSON schema
Usage
ArrowSerializer is instantiated per sink operator with the target format. The serialize method is called on each record batch to produce output bytes.
Code Reference
Source Location
crates/arroyo-formats/src/ser.rs
Signature
pub fn record_batch_to_vec(
batch: &RecordBatch,
explicit_nulls: bool,
timestamp_format: TimestampFormat,
decimal_encoding: DecimalEncoding,
) -> Result<Vec<Vec<u8>>, ArrowError>
pub struct ArrowSerializer {
kafka_schema: Option<Value>,
avro_schema: Option<Arc<apache_avro::schema::Schema>>,
format: Format,
projection: Vec<usize>,
}
impl ArrowSerializer {
pub fn new(format: Format) -> Self
pub fn avro_schema(schema: &arrow_schema::Schema) -> apache_avro::Schema
pub fn json_schema(schema: &arrow_schema::Schema) -> Value
pub fn kafka_schema(schema: &arrow_schema::Schema) -> Value
}
Import
use arroyo_formats::ser::{ArrowSerializer, record_batch_to_vec};
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| batch | &RecordBatch |
Arrow record batch to serialize |
| format | Format |
Target serialization format (JSON, Avro, RawString, RawBytes) |
Outputs
| Name | Type | Description |
|---|---|---|
| encoded rows | Vec<Vec<u8>> |
Serialized byte vectors, one per row |
Usage Examples
let mut serializer = ArrowSerializer::new(Format::Json(JsonFormat::default()));
// Serialize a record batch
let encoded_rows = serializer.serialize(&record_batch);
for row_bytes in encoded_rows {
// write to sink
}
Related Pages
- ArroyoSystems_Arroyo_Format_Deserializer - Complementary deserialization from wire formats to Arrow
- ArroyoSystems_Arroyo_Avro_Serializer - Avro-specific serialization used internally
- ArroyoSystems_Arroyo_Json_Encoders - Custom JSON encoder factory
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment