Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Kafka Sink

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

KafkaSinkFunc is the Arroyo operator that produces records to Apache Kafka topics, supporting both at-least-once and exactly-once delivery semantics via transactional producers.

Description

The Kafka sink operator serializes incoming Arrow record batches and publishes them to a configured Kafka topic through the rdkafka FutureProducer. It supports two consistency modes: AtLeastOnce, where records are sent without transactional guarantees, and ExactlyOnce, which uses Kafka transactions to ensure records are committed atomically during checkpoint cycles. The operator optionally extracts timestamp and key fields from the input schema to set Kafka message timestamps and partition keys. On checkpoint, it flushes all pending delivery futures and, when in exactly-once mode, commits the current transaction and begins a new one with a fresh transactional ID.

Usage

Use KafkaSinkFunc when you need to write streaming SQL query results to a Kafka topic from an Arroyo pipeline, especially when exactly-once delivery guarantees are required.

Code Reference

Source Location

Signature

pub struct KafkaSinkFunc {
    pub topic: String,
    pub bootstrap_servers: String,
    pub consistency_mode: ConsistencyMode,
    pub timestamp_field: Option<String>,
    pub timestamp_col: Option<usize>,
    pub key_field: Option<String>,
    pub key_col: Option<usize>,
    pub producer: Option<FutureProducer>,
    pub write_futures: Vec<DeliveryFuture>,
    pub client_config: HashMap<String, String>,
    pub context: Context,
    pub serializer: ArrowSerializer,
}

pub enum ConsistencyMode {
    AtLeastOnce,
    ExactlyOnce {
        next_transaction_index: usize,
        producer_to_complete: Option<FutureProducer>,
    },
}

#[async_trait]
impl ArrowOperator for KafkaSinkFunc { ... }

Import

use arroyo_connectors::kafka::sink::KafkaSinkFunc;
use arroyo_connectors::kafka::sink::ConsistencyMode;

I/O Contract

Inputs

Name Type Required Description
batch RecordBatch Yes Arrow record batch containing rows to publish to Kafka
topic String Yes Name of the Kafka topic to write to
bootstrap_servers String Yes Comma-separated Kafka broker addresses
consistency_mode ConsistencyMode Yes AtLeastOnce or ExactlyOnce delivery semantics
timestamp_field Option<String> No Name of the timestamp column in the schema (must be TIMESTAMP type)
key_field Option<String> No Name of the key column in the schema (must be TEXT type)
client_config HashMap<String, String> No Additional rdkafka client configuration properties

Outputs

Name Type Description
Kafka messages Kafka records Records published to the configured Kafka topic with optional key and timestamp

Usage Examples

let sink = KafkaSinkFunc {
    topic: "output-topic".to_string(),
    bootstrap_servers: "localhost:9092".to_string(),
    consistency_mode: ConsistencyMode::AtLeastOnce,
    timestamp_field: None,
    timestamp_col: None,
    key_field: Some("user_id".to_string()),
    key_col: None,
    producer: None,
    write_futures: vec![],
    client_config: HashMap::new(),
    context: Context::new(None),
    serializer: ArrowSerializer::new(format),
};

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment