Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Connection Api Types

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors, API
Last Updated 2026-02-08 08:00 GMT

Overview

Defines the API-level type system for Arroyo connection schemas, connectors, connection profiles, and connection tables used to bridge between external data systems and the streaming SQL engine.

Description

This module provides the complete set of serializable data types used by the Arroyo REST API and SQL planner to describe connections. It includes:

  • Connector -- metadata describing available connector plugins (Kafka, Kinesis, etc.) including source/sink capabilities.
  • ConnectionProfile and ConnectionProfilePost -- reusable credential/configuration bundles for connectors.
  • ConnectionType -- an enum distinguishing Source, Sink, and Lookup table roles.
  • FieldType and SourceField -- a recursive type system mapping API-level field definitions (Int32, Int64, Float64, Timestamp, Json, Struct, List, etc.) to Arrow DataTypes with bidirectional conversion via From/TryFrom traits.
  • ConnectionSchema -- the full schema description for a connection table, including format, framing, bad-data policy, field definitions, and schema definitions (JSON Schema, Protobuf, Avro).
  • ConnectionTable and ConnectionTablePost -- the persisted and creation representations of a connection table.

All types derive Serialize, Deserialize, and ToSchema (utoipa) for OpenAPI documentation generation.

Usage

Use these types when defining new connectors, creating connection tables via the REST API, or when the SQL planner needs to convert between API-level schema representations and Arrow schemas. The ConnectionSchema::arroyo_schema() method produces an ArroyoSchema suitable for the dataflow engine.

Code Reference

Source Location

Signature

// Core enums and structs
pub enum ConnectionType { Source, Sink, Lookup }
pub enum FieldType {
    Int32, Int64, Uint32, Uint64, Float32, Float64,
    Decimal128(DecimalField), Bool, String, Bytes,
    Timestamp(TimestampField), Json, Struct(StructField), List(ListField),
}

pub struct SourceField {
    pub name: String,
    pub field_type: FieldType,
    pub required: bool,
    pub sql_name: Option<String>,
    pub metadata_key: Option<String>,
}

pub struct ConnectionSchema {
    pub format: Option<Format>,
    pub bad_data: Option<BadData>,
    pub framing: Option<Framing>,
    pub fields: Vec<SourceField>,
    pub definition: Option<SchemaDefinition>,
    pub inferred: Option<bool>,
    pub primary_keys: HashSet<String>,
}

impl ConnectionSchema {
    pub fn try_new(...) -> anyhow::Result<Self>;
    pub fn validate(self) -> anyhow::Result<Self>;
    pub fn arroyo_schema(&self) -> ArroyoSchemaRef;
    pub fn metadata_fields(&self) -> Vec<MetadataField>;
}

Import

use arroyo_rpc::api_types::connections::{
    Connector, ConnectionProfile, ConnectionType, FieldType,
    SourceField, ConnectionSchema, ConnectionTable, SchemaDefinition,
};

I/O Contract

Inputs

Name Type Required Description
fields Vec<SourceField> Yes List of field definitions with name, type, nullability
format Option<Format> No Serialization format (JSON, Avro, Protobuf, Parquet, RawString, RawBytes)
framing Option<Framing> No Message framing method (e.g., newline-delimited)
bad_data Option<BadData> No Policy for handling malformed data (Fail or Drop)
definition Option<SchemaDefinition> No Raw schema definition (JSON Schema, Protobuf, Avro)

Outputs

Name Type Description
ArroyoSchemaRef Arc<ArroyoSchema> Arrow-based schema with timestamp index for the dataflow engine
Field arrow_schema::Field Arrow field produced from SourceField via From trait conversion
MetadataField Vec<MetadataField> Extracted metadata field mappings from the schema

Usage Examples

// Define a connection schema with JSON format
let schema = ConnectionSchema::try_new(
    Some(Format::Json(JsonFormat::default())),
    Some(BadData::Drop {}),
    None,
    vec![
        SourceField {
            name: "id".into(),
            field_type: FieldType::Int64,
            required: true,
            sql_name: None,
            metadata_key: None,
        },
        SourceField {
            name: "payload".into(),
            field_type: FieldType::Json,
            required: false,
            sql_name: None,
            metadata_key: None,
        },
    ],
    None,
    None,
    HashSet::new(),
)?;

// Convert to an ArroyoSchema for the engine
let arroyo_schema: ArroyoSchemaRef = schema.arroyo_schema();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment