Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Redpanda CSV Consumer

From Leeroopedia
Revision as of 14:41, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/DataTalksClub_Data_engineering_zoomcamp_Redpanda_CSV_Consumer.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

Python class RideCSVConsumer that consumes raw CSV-formatted string messages from a Kafka/Redpanda topic using kafka-python, with CLI-based topic selection via argparse.

Description

The RideCSVConsumer class provides a simple consumer for Kafka messages where both key and value are plain strings (as opposed to JSON-serialized objects). It wraps kafka-python's KafkaConsumer and exposes a consume_from_kafka method that subscribes to one or more topics and enters a continuous polling loop with a 1-second timeout.

Each received message is printed to the console showing the key, its type, the value, and its type. The loop is interrupted cleanly via KeyboardInterrupt, after which the consumer is closed.

The __main__ block uses argparse to accept a --topic argument (defaulting to CONSUME_TOPIC_RIDES_CSV from settings, which is rides_csv). The default configuration decodes keys as integers and values as UTF-8 strings, with auto_offset_reset set to earliest and a consumer group ID of consumer.group.id.csv-example.1.

This consumer is designed to work with the RideCSVProducer class that publishes CSV-formatted string records.

Usage

Use this implementation when consuming CSV-formatted string messages from a Redpanda or Kafka topic where no JSON or Avro deserialization is needed. It pairs with the RideCSVProducer and is also used as the ingestion stage before PySpark Structured Streaming processing.

Code Reference

Source Location

Signature

class RideCSVConsumer:
    def __init__(self, props: Dict):
        self.consumer = KafkaConsumer(**props)

    def consume_from_kafka(self, topics: List[str]):
        self.consumer.subscribe(topics=topics)
        print('Consuming from Kafka started')
        print('Available topics to consume: ', self.consumer.subscription())
        while True:
            try:
                msg = self.consumer.poll(1.0)
                if msg is None or msg == {}:
                    continue
                for msg_key, msg_values in msg.items():
                    for msg_val in msg_values:
                        print(f'Key:{msg_val.key}-type({type(msg_val.key)}), '
                              f'Value:{msg_val.value}-type({type(msg_val.value)})')
            except KeyboardInterrupt:
                break
        self.consumer.close()

Import

from consumer import RideCSVConsumer

I/O Contract

Inputs

Name Type Required Description
props Dict Yes Configuration dictionary passed to KafkaConsumer. Expected keys include bootstrap_servers, auto_offset_reset, enable_auto_commit, key_deserializer, value_deserializer, and group_id.
topics List[str] Yes List of Kafka topic names to subscribe to and consume messages from.

Outputs

Name Type Description
(stdout) str Each consumed message is printed to the console with key, key type, value, and value type. Keys are decoded as integers; values are decoded as UTF-8 strings containing CSV-formatted ride data.

Usage Examples

Basic Usage

from consumer import RideCSVConsumer
from settings import BOOTSTRAP_SERVERS, CONSUME_TOPIC_RIDES_CSV

config = {
    'bootstrap_servers': [BOOTSTRAP_SERVERS],
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': True,
    'key_deserializer': lambda key: int(key.decode('utf-8')),
    'value_deserializer': lambda value: value.decode('utf-8'),
    'group_id': 'consumer.group.id.csv-example.1',
}

csv_consumer = RideCSVConsumer(props=config)
csv_consumer.consume_from_kafka(topics=[CONSUME_TOPIC_RIDES_CSV])

CLI Usage

# Run from the command line with a custom topic:
# python consumer.py --topic my_custom_topic

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment