Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Kafka Source Tests

From Leeroopedia


Knowledge Sources
Domains Streaming, Connectors
Last Updated 2026-02-08 08:00 GMT

Overview

Integration test suite for KafkaSourceFunc that validates end-to-end Kafka consumption, checkpointing, state restoration, and metadata field extraction.

Description

The test module provides a KafkaTopicTester harness that manages Kafka topic lifecycle (creation/deletion) and constructs KafkaSourceFunc instances wired to test channels for control messages, data output, and checkpoint completion signals. KafkaTopicProducer is a simple wrapper around rdkafka's BaseProducer for sending test data. The KafkaSourceWithReads struct provides assertion helpers like assert_next_message_record_values to validate received data and assert_next_message_checkpoint to verify checkpoint barriers. Two integration tests are included: test_kafka validates basic consumption, checkpointing, state restoration from epoch, and continued consumption after recovery; test_kafka_with_metadata_fields verifies that metadata fields (such as offset) are correctly propagated into the output schema.

Usage

Use these tests to validate Kafka source correctness, checkpoint/restore behavior, and metadata field extraction against a running Kafka broker at 0.0.0.0:9092.

Code Reference

Source Location

Signature

pub struct KafkaTopicTester {
    topic: String,
    server: String,
    group_id: Option<String>,
}

struct KafkaTopicProducer {
    base_producer: BaseProducer,
    topic: String,
}

struct KafkaSourceWithReads {
    to_control_tx: Sender<ControlMessage>,
    from_control_rx: Receiver<ControlResp>,
    data_recv: BatchReceiver,
}

#[tokio::test]
async fn test_kafka() { ... }

#[tokio::test]
async fn test_kafka_with_metadata_fields() { ... }

Import

// Internal test module, not publicly importable
use crate::kafka::source::test::KafkaTopicTester;

I/O Contract

Inputs

Name Type Required Description
topic String Yes Kafka topic name used for testing (e.g. "__arroyo-source-test")
server String Yes Kafka broker address (e.g. "0.0.0.0:9092")
TestData struct { i: u64 } Yes Simple JSON-serializable test record

Outputs

Name Type Description
assertions test results Validates record values, checkpoint barriers, and state restoration correctness

Usage Examples

#[tokio::test]
async fn test_kafka() {
    let mut kafka_topic_tester = KafkaTopicTester {
        topic: "__arroyo-source-test".to_string(),
        server: "0.0.0.0:9092".to_string(),
        group_id: Some("test-consumer-group".to_string()),
    };

    kafka_topic_tester.create_topic().await;
    let mut reader = kafka_topic_tester
        .get_source_with_reader(task_info.clone(), None)
        .await;
    let mut producer = kafka_topic_tester.get_producer();

    for message in 1u64..20 {
        producer.send_data(TestData { i: message });
    }
    // ... assert received values ...
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment