Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Kinesis Source

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

KinesisSourceFunc is the Arroyo source operator that consumes records from AWS Kinesis streams with shard-level state tracking, automatic shard discovery, and retry logic for throttled reads.

Description

The Kinesis source operator initializes an AWS SDK KinesisClient and maintains per-shard state including offset (sequence number) and closed status. Shards are assigned to operator instances via hash-based partitioning on the shard ID. The operator uses a FuturesUnordered collection to concurrently read from multiple shards, each driven by shard iterator IDs. New shards are periodically discovered via sync_shards() every second. The operator handles ExpiredIteratorException by requesting new shard iterators, and retries throttled reads (ProvisionedThroughputExceededException) with exponential backoff up to 5 retries. Shard state (including sequence numbers) is persisted during checkpoints to enable recovery. The operator supports Earliest, Latest, SequenceNumber, and Timestamp offset modes.

Usage

Use KinesisSourceFunc when you need to ingest streaming data from an AWS Kinesis Data Stream into an Arroyo pipeline with fault-tolerant shard tracking.

Code Reference

Source Location

Signature

pub struct KinesisSourceFunc {
    pub stream_name: String,
    pub format: Format,
    pub framing: Option<Framing>,
    pub bad_data: Option<BadData>,
    pub kinesis_client: Option<KinesisClient>,
    pub aws_region: Option<String>,
    pub shards: HashMap<String, ShardState>,
    pub offset: SourceOffset,
}

pub struct ShardState {
    stream_name: String,
    shard_id: String,
    offset: KinesisOffset,
    closed: bool,
}

pub enum KinesisOffset {
    Earliest,
    Latest,
    SequenceNumber(String),
    Timestamp(SystemTime),
}

#[async_trait]
impl SourceOperator for KinesisSourceFunc {
    fn name(&self) -> String;
    fn tables(&self) -> HashMap<String, TableConfig>;
    async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
        -> DataflowResult<SourceFinishType>;
}

Import

use arroyo_connectors::kinesis::source::KinesisSourceFunc;

I/O Contract

Inputs

Name Type Required Description
stream_name String Yes Name of the Kinesis stream to consume from
format Format Yes Deserialization format for record data
offset SourceOffset Yes Starting offset (Earliest or Latest)
aws_region Option<String> No AWS region for the Kinesis client

Outputs

Name Type Description
records RecordBatch Deserialized Arrow record batches from Kinesis records with approximate arrival timestamps

Usage Examples

let source = KinesisSourceFunc {
    stream_name: "my-kinesis-stream".to_string(),
    format: Format::Json(JsonFormat::default()),
    framing: None,
    bad_data: None,
    kinesis_client: None,
    aws_region: Some("us-east-1".to_string()),
    shards: HashMap::new(),
    offset: SourceOffset::Latest,
};

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment