Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Kafka Source

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

KafkaSourceFunc is the Arroyo source operator that consumes records from Apache Kafka topics with partition-level offset tracking, rate limiting, and checkpointing support.

Description

The Kafka source operator creates an rdkafka StreamConsumer and assigns partitions to the operator instance based on the task's parallelism index. It restores partition offsets from global keyed state on recovery, enabling exactly-once consumption semantics when combined with Arroyo's checkpoint mechanism. The operator supports configurable group_id or group_id_prefix for consumer group management, a messages_per_second rate limiter via the Governor crate, and optional schema resolution for formats like Avro with a schema registry. Metadata fields such as key, offset_id, partition, topic, and timestamp can be extracted from Kafka message headers into the output schema.

Usage

Use KafkaSourceFunc when you need to read streaming data from Kafka topics into an Arroyo SQL pipeline, with support for checkpointed offset tracking and schema registry integration.

Code Reference

Source Location

Signature

pub struct KafkaSourceFunc {
    pub topic: String,
    pub bootstrap_servers: String,
    pub group_id: Option<String>,
    pub group_id_prefix: Option<String>,
    pub offset_mode: SourceOffset,
    pub format: Format,
    pub framing: Option<Framing>,
    pub bad_data: Option<BadData>,
    pub schema_resolver: Option<Arc<dyn SchemaResolver + Sync>>,
    pub client_configs: HashMap<String, String>,
    pub context: Context,
    pub messages_per_second: NonZeroU32,
    pub metadata_fields: Vec<MetadataField>,
}

#[async_trait]
impl SourceOperator for KafkaSourceFunc {
    async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
        -> DataflowResult<SourceFinishType>;
    fn name(&self) -> String;
    fn tables(&self) -> HashMap<String, TableConfig>;
}

Import

use arroyo_connectors::kafka::source::KafkaSourceFunc;

I/O Contract

Inputs

Name Type Required Description
topic String Yes Kafka topic to consume from
bootstrap_servers String Yes Comma-separated Kafka broker addresses
offset_mode SourceOffset Yes Starting offset (Earliest or Latest)
format Format Yes Deserialization format (JSON, Avro, RawString, etc.)
messages_per_second NonZeroU32 Yes Rate limiter for consumption throughput
group_id Option<String> No Explicit consumer group ID
metadata_fields Vec<MetadataField> No Kafka metadata to expose (key, offset_id, partition, topic, timestamp)

Outputs

Name Type Description
records RecordBatch Deserialized Arrow record batches from Kafka messages

Usage Examples

let source = KafkaSourceFunc {
    topic: "input-topic".to_string(),
    bootstrap_servers: "localhost:9092".to_string(),
    group_id: Some("my-consumer-group".to_string()),
    group_id_prefix: None,
    offset_mode: SourceOffset::Earliest,
    format: Format::Json(JsonFormat::default()),
    framing: None,
    bad_data: None,
    schema_resolver: None,
    client_configs: HashMap::new(),
    context: Context::new(None),
    messages_per_second: NonZeroU32::new(10000).unwrap(),
    metadata_fields: vec![],
};

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment