Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Kinesis Sink

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

KinesisSinkFunc is the Arroyo operator that writes serialized records to AWS Kinesis Data Streams using batched PutRecords API calls with configurable flush thresholds and retry logic.

Description

The Kinesis sink operator serializes incoming Arrow record batches using ArrowSerializer and buffers records in a BatchRecordPreparer. Records are assigned UUID partition keys and accumulated until a FlushConfig threshold is reached: maximum record count (default 500), maximum data size (default 4MB), or maximum age (default 1000ms). The flush method sends buffered records via the Kinesis PutRecords API, checks for failed records in the response, retains only the failed ones for retry, and reports the error. Flushes are retried with exponential backoff using the retry! macro (20 retries for regular flushes, 30 retries for checkpoint flushes). During checkpoints, the operator forces a flush to ensure all records are durably written. The operator initializes the AWS SDK KinesisClient on start with an optional region override.

Usage

Use KinesisSinkFunc when writing Arroyo pipeline results to AWS Kinesis Data Streams with batched delivery and checkpoint-aligned flushes.

Code Reference

Source Location

Signature

pub struct KinesisSinkFunc {
    pub client: Option<Arc<KinesisClient>>,
    pub aws_region: Option<String>,
    pub in_progress_batch: Option<BatchRecordPreparer>,
    pub flush_config: FlushConfig,
    pub serializer: ArrowSerializer,
    pub name: String,
}

pub struct FlushConfig {
    max_record_count: usize,  // default: 500
    max_data_size: usize,     // default: 4_000_000
    max_age: Duration,         // default: 1000ms
}

pub struct BatchRecordPreparer {
    client: Arc<KinesisClient>,
    stream: String,
    buffered_records: Vec<(String, Vec<u8>)>,
    data_size: usize,
    last_flush_time: Instant,
}

#[async_trait]
impl ArrowOperator for KinesisSinkFunc {
    fn name(&self) -> String;
    async fn on_start(&mut self, ctx: &mut OperatorContext) -> DataflowResult<()>;
    async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut OperatorContext,
        _: &mut dyn Collector) -> DataflowResult<()>;
    async fn handle_checkpoint(&mut self, _: CheckpointBarrier,
        _: &mut OperatorContext, _: &mut dyn Collector) -> DataflowResult<()>;
    async fn handle_tick(&mut self, _: u64, ctx: &mut OperatorContext,
        _: &mut dyn Collector) -> DataflowResult<()>;
}

Import

use arroyo_connectors::kinesis::sink::{KinesisSinkFunc, FlushConfig};

I/O Contract

Inputs

Name Type Required Description
batch RecordBatch Yes Arrow record batch to serialize and send to Kinesis
name String Yes Kinesis stream name
flush_config FlushConfig Yes Flush thresholds for record count, data size, and age
aws_region Option<String> No AWS region for the Kinesis client

Outputs

Name Type Description
PutRecords Kinesis API calls Batched PutRecords requests to the configured Kinesis stream

Usage Examples

let sink = KinesisSinkFunc {
    client: None,
    aws_region: Some("us-east-1".to_string()),
    in_progress_batch: None,
    flush_config: FlushConfig::new(Some(500), Some(4_000_000), Some(500)),
    serializer: ArrowSerializer::new(format),
    name: "my-kinesis-stream".to_string(),
};

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment