Implementation:ArroyoSystems Arroyo Kafka Source Tests
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Connectors |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
Integration test suite for KafkaSourceFunc that validates end-to-end Kafka consumption, checkpointing, state restoration, and metadata field extraction.
Description
The test module provides a KafkaTopicTester harness that manages Kafka topic lifecycle (creation/deletion) and constructs KafkaSourceFunc instances wired to test channels for control messages, data output, and checkpoint completion signals. KafkaTopicProducer is a simple wrapper around rdkafka's BaseProducer for sending test data. The KafkaSourceWithReads struct provides assertion helpers like assert_next_message_record_values to validate received data and assert_next_message_checkpoint to verify checkpoint barriers. Two integration tests are included: test_kafka validates basic consumption, checkpointing, state restoration from epoch, and continued consumption after recovery; test_kafka_with_metadata_fields verifies that metadata fields (such as offset) are correctly propagated into the output schema.
Usage
Use these tests to validate Kafka source correctness, checkpoint/restore behavior, and metadata field extraction against a running Kafka broker at 0.0.0.0:9092.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-connectors/src/kafka/source/test.rs
Signature
pub struct KafkaTopicTester {
topic: String,
server: String,
group_id: Option<String>,
}
struct KafkaTopicProducer {
base_producer: BaseProducer,
topic: String,
}
struct KafkaSourceWithReads {
to_control_tx: Sender<ControlMessage>,
from_control_rx: Receiver<ControlResp>,
data_recv: BatchReceiver,
}
#[tokio::test]
async fn test_kafka() { ... }
#[tokio::test]
async fn test_kafka_with_metadata_fields() { ... }
Import
// Internal test module, not publicly importable
use crate::kafka::source::test::KafkaTopicTester;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| topic | String | Yes | Kafka topic name used for testing (e.g. "__arroyo-source-test") |
| server | String | Yes | Kafka broker address (e.g. "0.0.0.0:9092") |
| TestData | struct { i: u64 } | Yes | Simple JSON-serializable test record |
Outputs
| Name | Type | Description |
|---|---|---|
| assertions | test results | Validates record values, checkpoint barriers, and state restoration correctness |
Usage Examples
#[tokio::test]
async fn test_kafka() {
let mut kafka_topic_tester = KafkaTopicTester {
topic: "__arroyo-source-test".to_string(),
server: "0.0.0.0:9092".to_string(),
group_id: Some("test-consumer-group".to_string()),
};
kafka_topic_tester.create_topic().await;
let mut reader = kafka_topic_tester
.get_source_with_reader(task_info.clone(), None)
.await;
let mut producer = kafka_topic_tester.get_producer();
for message in 1u64..20 {
producer.send_data(TestData { i: message });
}
// ... assert received values ...
}