Implementation:ArroyoSystems Arroyo Kinesis Connector
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
KinesisConnector implements the Arroyo Connector trait for AWS Kinesis, providing both source and sink capabilities with configurable stream names, regions, offsets, and flush policies.
Description
The Kinesis connector supports Source and Sink table types. Sources are configured with Earliest or Latest offset modes and an optional AWS region. Sinks support configurable flush behavior through FlushConfig with parameters for batch_flush_interval_millis, batch_max_buffer_size (max bytes per batch), and records_per_batch (max records per batch). The connector constructs KinesisSourceFunc for sources (initializing with stream name, format, offset, and region) and KinesisSinkFunc for sinks (using ArrowSerializer and FlushConfig). Configuration is parsed from SQL WITH clauses via from_options and serialized as JSON for OperatorConfig.
Usage
Use KinesisConnector when building Arroyo pipelines that need to consume from or write to AWS Kinesis Data Streams.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/kinesis/mod.rs
Signature
pub struct KinesisConnector {}
impl Connector for KinesisConnector {
type ProfileT = EmptyConfig;
type TableT = KinesisTable;
fn name(&self) -> &'static str; // returns "kinesis"
fn table_type(&self, _: EmptyConfig, table: KinesisTable) -> ConnectionType;
fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
table: KinesisTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
fn from_options(&self, name: &str, options: &mut ConnectorOptions,
schema: Option<&ConnectionSchema>, _: Option<&ConnectionProfile>) -> anyhow::Result<Connection>;
fn make_operator(&self, _: EmptyConfig, table: KinesisTable,
config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}
Import
use arroyo_connectors::kinesis::KinesisConnector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stream_name | String | Yes | AWS Kinesis stream name |
| type | TableType | Yes | Source (with offset) or Sink (with flush config) |
| aws_region | Option<String> | No | AWS region for the Kinesis client |
| source.offset | SourceOffset | No | Earliest or Latest (default: Latest) |
| sink.flush_interval_millis | Option<i64> | No | Flush interval in milliseconds (default: 1000) |
| sink.max_bytes_per_batch | Option<i64> | No | Maximum bytes per batch (default: 4MB) |
| sink.max_records_per_batch | Option<i64> | No | Maximum records per batch (default: 500) |
Outputs
| Name | Type | Description |
|---|---|---|
| Connection | Connection | Configured Kinesis connection (Source or Sink) |
| ConstructedOperator | ConstructedOperator | KinesisSourceFunc or KinesisSinkFunc operator |
Usage Examples
CREATE TABLE kinesis_source (
value TEXT
) WITH (
connector = 'kinesis',
stream_name = 'my-stream',
type = 'source',
'source.offset' = 'earliest',
aws_region = 'us-east-1',
format = 'json'
);