Implementation:ArroyoSystems Arroyo Connection Api Types
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors, API |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Defines the API-level type system for Arroyo connection schemas, connectors, connection profiles, and connection tables used to bridge between external data systems and the streaming SQL engine.
Description
This module provides the complete set of serializable data types used by the Arroyo REST API and SQL planner to describe connections. It includes:
- Connector -- metadata describing available connector plugins (Kafka, Kinesis, etc.) including source/sink capabilities.
- ConnectionProfile and ConnectionProfilePost -- reusable credential/configuration bundles for connectors.
- ConnectionType -- an enum distinguishing Source, Sink, and Lookup table roles.
- FieldType and SourceField -- a recursive type system mapping API-level field definitions (Int32, Int64, Float64, Timestamp, Json, Struct, List, etc.) to Arrow DataTypes with bidirectional conversion via From/TryFrom traits.
- ConnectionSchema -- the full schema description for a connection table, including format, framing, bad-data policy, field definitions, and schema definitions (JSON Schema, Protobuf, Avro).
- ConnectionTable and ConnectionTablePost -- the persisted and creation representations of a connection table.
All types derive Serialize, Deserialize, and ToSchema (utoipa) for OpenAPI documentation generation.
Usage
Use these types when defining new connectors, creating connection tables via the REST API, or when the SQL planner needs to convert between API-level schema representations and Arrow schemas. The ConnectionSchema::arroyo_schema() method produces an ArroyoSchema suitable for the dataflow engine.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-rpc/src/api_types/connections.rs
Signature
// Core enums and structs
pub enum ConnectionType { Source, Sink, Lookup }
pub enum FieldType {
Int32, Int64, Uint32, Uint64, Float32, Float64,
Decimal128(DecimalField), Bool, String, Bytes,
Timestamp(TimestampField), Json, Struct(StructField), List(ListField),
}
pub struct SourceField {
pub name: String,
pub field_type: FieldType,
pub required: bool,
pub sql_name: Option<String>,
pub metadata_key: Option<String>,
}
pub struct ConnectionSchema {
pub format: Option<Format>,
pub bad_data: Option<BadData>,
pub framing: Option<Framing>,
pub fields: Vec<SourceField>,
pub definition: Option<SchemaDefinition>,
pub inferred: Option<bool>,
pub primary_keys: HashSet<String>,
}
impl ConnectionSchema {
pub fn try_new(...) -> anyhow::Result<Self>;
pub fn validate(self) -> anyhow::Result<Self>;
pub fn arroyo_schema(&self) -> ArroyoSchemaRef;
pub fn metadata_fields(&self) -> Vec<MetadataField>;
}
Import
use arroyo_rpc::api_types::connections::{
Connector, ConnectionProfile, ConnectionType, FieldType,
SourceField, ConnectionSchema, ConnectionTable, SchemaDefinition,
};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| fields | Vec<SourceField> | Yes | List of field definitions with name, type, nullability |
| format | Option<Format> | No | Serialization format (JSON, Avro, Protobuf, Parquet, RawString, RawBytes) |
| framing | Option<Framing> | No | Message framing method (e.g., newline-delimited) |
| bad_data | Option<BadData> | No | Policy for handling malformed data (Fail or Drop) |
| definition | Option<SchemaDefinition> | No | Raw schema definition (JSON Schema, Protobuf, Avro) |
Outputs
| Name | Type | Description |
|---|---|---|
| ArroyoSchemaRef | Arc<ArroyoSchema> | Arrow-based schema with timestamp index for the dataflow engine |
| Field | arrow_schema::Field | Arrow field produced from SourceField via From trait conversion |
| MetadataField | Vec<MetadataField> | Extracted metadata field mappings from the schema |
Usage Examples
// Define a connection schema with JSON format
let schema = ConnectionSchema::try_new(
Some(Format::Json(JsonFormat::default())),
Some(BadData::Drop {}),
None,
vec![
SourceField {
name: "id".into(),
field_type: FieldType::Int64,
required: true,
sql_name: None,
metadata_key: None,
},
SourceField {
name: "payload".into(),
field_type: FieldType::Json,
required: false,
sql_name: None,
metadata_key: None,
},
],
None,
None,
HashSet::new(),
)?;
// Convert to an ArroyoSchema for the engine
let arroyo_schema: ArroyoSchemaRef = schema.arroyo_schema();