Implementation:ArroyoSystems Arroyo Kafka Sink
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
KafkaSinkFunc is the Arroyo operator that produces records to Apache Kafka topics, supporting both at-least-once and exactly-once delivery semantics via transactional producers.
Description
The Kafka sink operator serializes incoming Arrow record batches and publishes them to a configured Kafka topic through the rdkafka FutureProducer. It supports two consistency modes: AtLeastOnce, where records are sent without transactional guarantees, and ExactlyOnce, which uses Kafka transactions to ensure records are committed atomically during checkpoint cycles. The operator optionally extracts timestamp and key fields from the input schema to set Kafka message timestamps and partition keys. On checkpoint, it flushes all pending delivery futures and, when in exactly-once mode, commits the current transaction and begins a new one with a fresh transactional ID.
Usage
Use KafkaSinkFunc when you need to write streaming SQL query results to a Kafka topic from an Arroyo pipeline, especially when exactly-once delivery guarantees are required.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/kafka/sink/mod.rs
Signature
pub struct KafkaSinkFunc {
pub topic: String,
pub bootstrap_servers: String,
pub consistency_mode: ConsistencyMode,
pub timestamp_field: Option<String>,
pub timestamp_col: Option<usize>,
pub key_field: Option<String>,
pub key_col: Option<usize>,
pub producer: Option<FutureProducer>,
pub write_futures: Vec<DeliveryFuture>,
pub client_config: HashMap<String, String>,
pub context: Context,
pub serializer: ArrowSerializer,
}
pub enum ConsistencyMode {
AtLeastOnce,
ExactlyOnce {
next_transaction_index: usize,
producer_to_complete: Option<FutureProducer>,
},
}
#[async_trait]
impl ArrowOperator for KafkaSinkFunc { ... }
Import
use arroyo_connectors::kafka::sink::KafkaSinkFunc;
use arroyo_connectors::kafka::sink::ConsistencyMode;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batch | RecordBatch | Yes | Arrow record batch containing rows to publish to Kafka |
| topic | String | Yes | Name of the Kafka topic to write to |
| bootstrap_servers | String | Yes | Comma-separated Kafka broker addresses |
| consistency_mode | ConsistencyMode | Yes | AtLeastOnce or ExactlyOnce delivery semantics |
| timestamp_field | Option<String> | No | Name of the timestamp column in the schema (must be TIMESTAMP type) |
| key_field | Option<String> | No | Name of the key column in the schema (must be TEXT type) |
| client_config | HashMap<String, String> | No | Additional rdkafka client configuration properties |
Outputs
| Name | Type | Description |
|---|---|---|
| Kafka messages | Kafka records | Records published to the configured Kafka topic with optional key and timestamp |
Usage Examples
let sink = KafkaSinkFunc {
topic: "output-topic".to_string(),
bootstrap_servers: "localhost:9092".to_string(),
consistency_mode: ConsistencyMode::AtLeastOnce,
timestamp_field: None,
timestamp_col: None,
key_field: Some("user_id".to_string()),
key_col: None,
producer: None,
write_futures: vec![],
client_config: HashMap::new(),
context: Context::new(None),
serializer: ArrowSerializer::new(format),
};