Implementation:ArroyoSystems Arroyo Kafka Source
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
KafkaSourceFunc is the Arroyo source operator that consumes records from Apache Kafka topics with partition-level offset tracking, rate limiting, and checkpointing support.
Description
The Kafka source operator creates an rdkafka StreamConsumer and assigns partitions to the operator instance based on the task's parallelism index. It restores partition offsets from global keyed state on recovery, enabling exactly-once consumption semantics when combined with Arroyo's checkpoint mechanism. The operator supports configurable group_id or group_id_prefix for consumer group management, a messages_per_second rate limiter via the Governor crate, and optional schema resolution for formats like Avro with a schema registry. Metadata fields such as key, offset_id, partition, topic, and timestamp can be extracted from Kafka message headers into the output schema.
Usage
Use KafkaSourceFunc when you need to read streaming data from Kafka topics into an Arroyo SQL pipeline, with support for checkpointed offset tracking and schema registry integration.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/kafka/source/mod.rs
Signature
pub struct KafkaSourceFunc {
pub topic: String,
pub bootstrap_servers: String,
pub group_id: Option<String>,
pub group_id_prefix: Option<String>,
pub offset_mode: SourceOffset,
pub format: Format,
pub framing: Option<Framing>,
pub bad_data: Option<BadData>,
pub schema_resolver: Option<Arc<dyn SchemaResolver + Sync>>,
pub client_configs: HashMap<String, String>,
pub context: Context,
pub messages_per_second: NonZeroU32,
pub metadata_fields: Vec<MetadataField>,
}
#[async_trait]
impl SourceOperator for KafkaSourceFunc {
async fn run(&mut self, ctx: &mut SourceContext, collector: &mut SourceCollector)
-> DataflowResult<SourceFinishType>;
fn name(&self) -> String;
fn tables(&self) -> HashMap<String, TableConfig>;
}
Import
use arroyo_connectors::kafka::source::KafkaSourceFunc;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| topic | String | Yes | Kafka topic to consume from |
| bootstrap_servers | String | Yes | Comma-separated Kafka broker addresses |
| offset_mode | SourceOffset | Yes | Starting offset (Earliest or Latest) |
| format | Format | Yes | Deserialization format (JSON, Avro, RawString, etc.) |
| messages_per_second | NonZeroU32 | Yes | Rate limiter for consumption throughput |
| group_id | Option<String> | No | Explicit consumer group ID |
| metadata_fields | Vec<MetadataField> | No | Kafka metadata to expose (key, offset_id, partition, topic, timestamp) |
Outputs
| Name | Type | Description |
|---|---|---|
| records | RecordBatch | Deserialized Arrow record batches from Kafka messages |
Usage Examples
let source = KafkaSourceFunc {
topic: "input-topic".to_string(),
bootstrap_servers: "localhost:9092".to_string(),
group_id: Some("my-consumer-group".to_string()),
group_id_prefix: None,
offset_mode: SourceOffset::Earliest,
format: Format::Json(JsonFormat::default()),
framing: None,
bad_data: None,
schema_resolver: None,
client_configs: HashMap::new(),
context: Context::new(None),
messages_per_second: NonZeroU32::new(10000).unwrap(),
metadata_fields: vec![],
};