Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Format Types

From Leeroopedia
Revision as of 14:26, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_Format_Types.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Streaming, Serialization, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

Defines the serialization format, framing, and bad-data handling types used by Arroyo connectors to describe how data is encoded and decoded on the wire.

Description

This module contains the Format, Framing, and BadData enums along with their format-specific configuration structs:

  • Format -- a tagged enum with variants for Json, Avro, Protobuf, Parquet, RawString, and RawBytes.
  • JsonFormat -- configures JSON serialization including Confluent Schema Registry support, Debezium CDC mode, unstructured JSON mode, timestamp format (RFC3339 or UnixMillis), and decimal encoding (Number, String, or Bytes).
  • AvroFormat -- configures Avro with Confluent Schema Registry, raw datum mode, unstructured JSON conversion, and optional reader schema.
  • ProtobufFormat -- configures Protobuf with message name, compiled schema, Confluent Schema Registry, and length-delimited framing.
  • ParquetFormat -- configures Parquet compression (Uncompressed, Snappy, Gzip, Zstd, Lz4, Lz4Raw) and row group size.
  • Framing -- currently supports newline-delimited framing with optional max line length.
  • BadData -- policy for handling malformed records: Fail (stop pipeline) or Drop (skip and log).

All types support construction from ConnectorOptions via from_opts methods for SQL CREATE TABLE statement parsing.

Usage

Use these types when implementing connectors to specify how data should be serialized/deserialized. The Format enum is embedded in ConnectionSchema and flows through to the ArrowDeserializer and ArrowSerializer at runtime.

Code Reference

Source Location

Signature

pub enum Format {
    Json(JsonFormat),
    Avro(AvroFormat),
    Protobuf(ProtobufFormat),
    Parquet(ParquetFormat),
    RawString(RawStringFormat),
    RawBytes(RawBytesFormat),
}

impl Format {
    pub fn from_opts(opts: &mut ConnectorOptions) -> DFResult<Option<Self>>;
    pub fn is_updating(&self) -> bool;
}

pub enum BadData { Fail {}, Drop {} }
pub enum Framing { Newline(NewlineDelimitedFraming) }

pub struct JsonFormat {
    pub confluent_schema_registry: bool,
    pub schema_id: Option<u32>,
    pub include_schema: bool,
    pub debezium: bool,
    pub unstructured: bool,
    pub timestamp_format: TimestampFormat,
    pub decimal_encoding: DecimalEncoding,
}

pub struct AvroFormat {
    pub confluent_schema_registry: bool,
    pub raw_datums: bool,
    pub into_unstructured_json: bool,
    pub reader_schema: Option<SerializableAvroSchema>,
    pub schema_id: Option<u32>,
}

Import

use arroyo_rpc::formats::{Format, JsonFormat, AvroFormat, ParquetFormat, BadData, Framing};

I/O Contract

Inputs

Name Type Required Description
opts &mut ConnectorOptions Yes SQL WITH clause options parsed from CREATE TABLE
format key String Yes The "format" option value: "json", "avro", "protobuf", "parquet", "raw_string", "raw_bytes"
bad_data key String No "fail" or "drop"
framing key String No "newline"

Outputs

Name Type Description
Format Format Fully configured serialization format
BadData BadData Error handling policy
Framing Framing Message framing configuration

Usage Examples

use arroyo_rpc::formats::{Format, JsonFormat, TimestampFormat, BadData};

// Construct a JSON format with Debezium CDC mode
let format = Format::Json(JsonFormat {
    confluent_schema_registry: false,
    schema_id: None,
    include_schema: false,
    debezium: true,
    unstructured: false,
    timestamp_format: TimestampFormat::UnixMillis,
    decimal_encoding: Default::default(),
});

assert!(format.is_updating()); // Debezium is an updating format

// Parse from SQL options
let format = Format::from_opts(&mut connector_opts)?;
let bad_data = BadData::from_opts(&mut connector_opts)?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment