Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Kinesis Connector

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

KinesisConnector implements the Arroyo Connector trait for AWS Kinesis, providing both source and sink capabilities with configurable stream names, regions, offsets, and flush policies.

Description

The Kinesis connector supports Source and Sink table types. Sources are configured with Earliest or Latest offset modes and an optional AWS region. Sinks support configurable flush behavior through FlushConfig with parameters for batch_flush_interval_millis, batch_max_buffer_size (max bytes per batch), and records_per_batch (max records per batch). The connector constructs KinesisSourceFunc for sources (initializing with stream name, format, offset, and region) and KinesisSinkFunc for sinks (using ArrowSerializer and FlushConfig). Configuration is parsed from SQL WITH clauses via from_options and serialized as JSON for OperatorConfig.

Usage

Use KinesisConnector when building Arroyo pipelines that need to consume from or write to AWS Kinesis Data Streams.

Code Reference

Source Location

Signature

pub struct KinesisConnector {}

impl Connector for KinesisConnector {
    type ProfileT = EmptyConfig;
    type TableT = KinesisTable;

    fn name(&self) -> &'static str; // returns "kinesis"
    fn table_type(&self, _: EmptyConfig, table: KinesisTable) -> ConnectionType;
    fn from_config(&self, id: Option<i64>, name: &str, config: EmptyConfig,
        table: KinesisTable, schema: Option<&ConnectionSchema>) -> anyhow::Result<Connection>;
    fn from_options(&self, name: &str, options: &mut ConnectorOptions,
        schema: Option<&ConnectionSchema>, _: Option<&ConnectionProfile>) -> anyhow::Result<Connection>;
    fn make_operator(&self, _: EmptyConfig, table: KinesisTable,
        config: OperatorConfig) -> anyhow::Result<ConstructedOperator>;
}

Import

use arroyo_connectors::kinesis::KinesisConnector;

I/O Contract

Inputs

Name Type Required Description
stream_name String Yes AWS Kinesis stream name
type TableType Yes Source (with offset) or Sink (with flush config)
aws_region Option<String> No AWS region for the Kinesis client
source.offset SourceOffset No Earliest or Latest (default: Latest)
sink.flush_interval_millis Option<i64> No Flush interval in milliseconds (default: 1000)
sink.max_bytes_per_batch Option<i64> No Maximum bytes per batch (default: 4MB)
sink.max_records_per_batch Option<i64> No Maximum records per batch (default: 500)

Outputs

Name Type Description
Connection Connection Configured Kinesis connection (Source or Sink)
ConstructedOperator ConstructedOperator KinesisSourceFunc or KinesisSinkFunc operator

Usage Examples

CREATE TABLE kinesis_source (
    value TEXT
) WITH (
    connector = 'kinesis',
    stream_name = 'my-stream',
    type = 'source',
    'source.offset' = 'earliest',
    aws_region = 'us-east-1',
    format = 'json'
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment