Implementation:ArroyoSystems Arroyo Kinesis Source
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
KinesisSourceFunc is the Arroyo source operator that consumes records from AWS Kinesis streams with shard-level state tracking, automatic shard discovery, and retry logic for throttled reads.
Description
The Kinesis source operator initializes an AWS SDK KinesisClient and maintains per-shard state including offset (sequence number) and closed status. Shards are assigned to operator instances via hash-based partitioning on the shard ID. The operator uses a FuturesUnordered collection to concurrently read from multiple shards, each driven by shard iterator IDs. New shards are periodically discovered via sync_shards() every second. The operator handles ExpiredIteratorException by requesting new shard iterators, and retries throttled reads (ProvisionedThroughputExceededException) with exponential backoff up to 5 retries. Shard state (including sequence numbers) is persisted during checkpoints to enable recovery. The operator supports Earliest, Latest, SequenceNumber, and Timestamp offset modes.
Usage
Use KinesisSourceFunc when you need to ingest streaming data from an AWS Kinesis Data Stream into an Arroyo pipeline with fault-tolerant shard tracking.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/kinesis/source.rs
Signature
pub struct KinesisSourceFunc {
pub stream_name: String,
pub format: Format,
pub framing: Option<Framing>,
pub bad_data: Option<BadData>,
pub kinesis_client: Option<KinesisClient>,
pub aws_region: Option<String>,
pub shards: HashMap<String, ShardState>,
pub offset: SourceOffset,
}
pub struct ShardState {
stream_name: String,
shard_id: String,
offset: KinesisOffset,
closed: bool,
}
pub enum KinesisOffset {
Earliest,
Latest,
SequenceNumber(String),
Timestamp(SystemTime),
}
#[async_trait]
impl SourceOperator for KinesisSourceFunc {
fn name(&self) -> String;
fn tables(&self) -> HashMap<String, TableConfig>;
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
-> DataflowResult<SourceFinishType>;
}
Import
use arroyo_connectors::kinesis::source::KinesisSourceFunc;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stream_name | String | Yes | Name of the Kinesis stream to consume from |
| format | Format | Yes | Deserialization format for record data |
| offset | SourceOffset | Yes | Starting offset (Earliest or Latest) |
| aws_region | Option<String> | No | AWS region for the Kinesis client |
Outputs
| Name | Type | Description |
|---|---|---|
| records | RecordBatch | Deserialized Arrow record batches from Kinesis records with approximate arrival timestamps |
Usage Examples
let source = KinesisSourceFunc {
stream_name: "my-kinesis-stream".to_string(),
format: Format::Json(JsonFormat::default()),
framing: None,
bad_data: None,
kinesis_client: None,
aws_region: Some("us-east-1".to_string()),
shards: HashMap::new(),
offset: SourceOffset::Latest,
};