Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Json Encoders

From Leeroopedia


Overview

ArroyoEncoderFactory is a custom JSON encoder factory that extends Arrow's default JSON serialization with Arroyo-specific features including configurable timestamp formatting, raw JSON passthrough, and customizable decimal encoding for Avro and Kafka Connect compatibility.

Description

The module implements:

  • ArroyoEncoderFactory: Implements Arrow's EncoderFactory trait, intercepting the default encoder creation to provide custom encoders for:
    • Timestamps: When TimestampFormat::UnixMillis is selected, timestamps are encoded as integer millisecond values instead of RFC3339 strings. Handles all Arrow timestamp units (Second, Millisecond, Microsecond, Nanosecond) and Date types (Date32, Date64).
    • Decimals: When DecimalEncoding::Bytes is selected, Decimal128 values are encoded as base64 byte strings for Kafka Connect compatibility.
    • Raw JSON: Fields with the ArroyoExtensionType::JSON metadata are passed through as raw JSON strings without re-quoting.

Custom encoder types defined in the module:

  • UnixMillisTimeEncoder - Encodes timestamps as Unix millisecond integers
  • DecimalBytesEncoder - Encodes decimals as base64 byte strings
  • RawJsonEncoder - Passes JSON string values through without quoting

Usage

This factory is used by record_batch_to_vec in the serializer module and configured via the TimestampFormat and DecimalEncoding enum parameters.

Code Reference

Source Location

crates/arroyo-formats/src/json/encoders.rs

Signature

#[derive(Debug)]
pub struct ArroyoEncoderFactory {
    pub timestamp_format: TimestampFormat,
    pub decimal_encoding: DecimalEncoding,
}

impl EncoderFactory for ArroyoEncoderFactory {
    fn make_default_encoder<'a>(
        &self,
        field: &'a FieldRef,
        array: &'a dyn Array,
        _options: &'a EncoderOptions,
    ) -> Result<Option<NullableEncoder<'a>>, ArrowError>
}

Import

use arroyo_formats::json::encoders::ArroyoEncoderFactory;

I/O Contract

Inputs

Name Type Description
timestamp_format TimestampFormat Desired timestamp output format (RFC3339 or UnixMillis)
decimal_encoding DecimalEncoding Desired decimal output format (Numeric or Bytes)
field &FieldRef Arrow field metadata for determining encoder type
array &dyn Array Arrow array data to encode

Outputs

Name Type Description
encoder Option<NullableEncoder> Custom encoder if applicable, or None to use default Arrow JSON encoding

Usage Examples

let options = EncoderOptions::default()
    .with_explicit_nulls(true)
    .with_encoder_factory(Arc::new(ArroyoEncoderFactory {
        timestamp_format: TimestampFormat::UnixMillis,
        decimal_encoding: DecimalEncoding::Bytes,
    }));

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment