Implementation:ArroyoSystems Arroyo Json Encoders
Overview
ArroyoEncoderFactory is a custom JSON encoder factory that extends Arrow's default JSON serialization with Arroyo-specific features including configurable timestamp formatting, raw JSON passthrough, and customizable decimal encoding for Avro and Kafka Connect compatibility.
Description
The module implements:
ArroyoEncoderFactory: Implements Arrow'sEncoderFactorytrait, intercepting the default encoder creation to provide custom encoders for:- Timestamps: When
TimestampFormat::UnixMillisis selected, timestamps are encoded as integer millisecond values instead of RFC3339 strings. Handles all Arrow timestamp units (Second, Millisecond, Microsecond, Nanosecond) and Date types (Date32, Date64). - Decimals: When
DecimalEncoding::Bytesis selected, Decimal128 values are encoded as base64 byte strings for Kafka Connect compatibility. - Raw JSON: Fields with the
ArroyoExtensionType::JSONmetadata are passed through as raw JSON strings without re-quoting.
- Timestamps: When
Custom encoder types defined in the module:
UnixMillisTimeEncoder- Encodes timestamps as Unix millisecond integersDecimalBytesEncoder- Encodes decimals as base64 byte stringsRawJsonEncoder- Passes JSON string values through without quoting
Usage
This factory is used by record_batch_to_vec in the serializer module and configured via the TimestampFormat and DecimalEncoding enum parameters.
Code Reference
Source Location
crates/arroyo-formats/src/json/encoders.rs
Signature
#[derive(Debug)]
pub struct ArroyoEncoderFactory {
pub timestamp_format: TimestampFormat,
pub decimal_encoding: DecimalEncoding,
}
impl EncoderFactory for ArroyoEncoderFactory {
fn make_default_encoder<'a>(
&self,
field: &'a FieldRef,
array: &'a dyn Array,
_options: &'a EncoderOptions,
) -> Result<Option<NullableEncoder<'a>>, ArrowError>
}
Import
use arroyo_formats::json::encoders::ArroyoEncoderFactory;
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| timestamp_format | TimestampFormat |
Desired timestamp output format (RFC3339 or UnixMillis) |
| decimal_encoding | DecimalEncoding |
Desired decimal output format (Numeric or Bytes) |
| field | &FieldRef |
Arrow field metadata for determining encoder type |
| array | &dyn Array |
Arrow array data to encode |
Outputs
| Name | Type | Description |
|---|---|---|
| encoder | Option<NullableEncoder> |
Custom encoder if applicable, or None to use default Arrow JSON encoding |
Usage Examples
let options = EncoderOptions::default()
.with_explicit_nulls(true)
.with_encoder_factory(Arc::new(ArroyoEncoderFactory {
timestamp_format: TimestampFormat::UnixMillis,
decimal_encoding: DecimalEncoding::Bytes,
}));
Related Pages
- ArroyoSystems_Arroyo_Format_Serializer - Uses ArroyoEncoderFactory for JSON serialization
- ArroyoSystems_Arroyo_Json_Schema_Module - JSON schema generation and conversion