Implementation:ArroyoSystems Arroyo Kinesis Sink
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
KinesisSinkFunc is the Arroyo operator that writes serialized records to AWS Kinesis Data Streams using batched PutRecords API calls with configurable flush thresholds and retry logic.
Description
The Kinesis sink operator serializes incoming Arrow record batches using ArrowSerializer and buffers records in a BatchRecordPreparer. Records are assigned UUID partition keys and accumulated until a FlushConfig threshold is reached: maximum record count (default 500), maximum data size (default 4MB), or maximum age (default 1000ms). The flush method sends buffered records via the Kinesis PutRecords API, checks for failed records in the response, retains only the failed ones for retry, and reports the error. Flushes are retried with exponential backoff using the retry! macro (20 retries for regular flushes, 30 retries for checkpoint flushes). During checkpoints, the operator forces a flush to ensure all records are durably written. The operator initializes the AWS SDK KinesisClient on start with an optional region override.
Usage
Use KinesisSinkFunc when writing Arroyo pipeline results to AWS Kinesis Data Streams with batched delivery and checkpoint-aligned flushes.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/kinesis/sink.rs
Signature
pub struct KinesisSinkFunc {
pub client: Option<Arc<KinesisClient>>,
pub aws_region: Option<String>,
pub in_progress_batch: Option<BatchRecordPreparer>,
pub flush_config: FlushConfig,
pub serializer: ArrowSerializer,
pub name: String,
}
pub struct FlushConfig {
max_record_count: usize, // default: 500
max_data_size: usize, // default: 4_000_000
max_age: Duration, // default: 1000ms
}
pub struct BatchRecordPreparer {
client: Arc<KinesisClient>,
stream: String,
buffered_records: Vec<(String, Vec<u8>)>,
data_size: usize,
last_flush_time: Instant,
}
#[async_trait]
impl ArrowOperator for KinesisSinkFunc {
fn name(&self) -> String;
async fn on_start(&mut self, ctx: &mut OperatorContext) -> DataflowResult<()>;
async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut OperatorContext,
_: &mut dyn Collector) -> DataflowResult<()>;
async fn handle_checkpoint(&mut self, _: CheckpointBarrier,
_: &mut OperatorContext, _: &mut dyn Collector) -> DataflowResult<()>;
async fn handle_tick(&mut self, _: u64, ctx: &mut OperatorContext,
_: &mut dyn Collector) -> DataflowResult<()>;
}
Import
use arroyo_connectors::kinesis::sink::{KinesisSinkFunc, FlushConfig};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batch | RecordBatch | Yes | Arrow record batch to serialize and send to Kinesis |
| name | String | Yes | Kinesis stream name |
| flush_config | FlushConfig | Yes | Flush thresholds for record count, data size, and age |
| aws_region | Option<String> | No | AWS region for the Kinesis client |
Outputs
| Name | Type | Description |
|---|---|---|
| PutRecords | Kinesis API calls | Batched PutRecords requests to the configured Kinesis stream |
Usage Examples
let sink = KinesisSinkFunc {
client: None,
aws_region: Some("us-east-1".to_string()),
in_progress_batch: None,
flush_config: FlushConfig::new(Some(500), Some(4_000_000), Some(500)),
serializer: ArrowSerializer::new(format),
name: "my-kinesis-stream".to_string(),
};