Implementation:DataTalksClub Data engineering zoomcamp Redpanda CSV Consumer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Python class RideCSVConsumer that consumes raw CSV-formatted string messages from a Kafka/Redpanda topic using kafka-python, with CLI-based topic selection via argparse.
Description
The RideCSVConsumer class provides a simple consumer for Kafka messages where both key and value are plain strings (as opposed to JSON-serialized objects). It wraps kafka-python's KafkaConsumer and exposes a consume_from_kafka method that subscribes to one or more topics and enters a continuous polling loop with a 1-second timeout.
Each received message is printed to the console showing the key, its type, the value, and its type. The loop is interrupted cleanly via KeyboardInterrupt, after which the consumer is closed.
The __main__ block uses argparse to accept a --topic argument (defaulting to CONSUME_TOPIC_RIDES_CSV from settings, which is rides_csv). The default configuration decodes keys as integers and values as UTF-8 strings, with auto_offset_reset set to earliest and a consumer group ID of consumer.group.id.csv-example.1.
This consumer is designed to work with the RideCSVProducer class that publishes CSV-formatted string records.
Usage
Use this implementation when consuming CSV-formatted string messages from a Redpanda or Kafka topic where no JSON or Avro deserialization is needed. It pairs with the RideCSVProducer and is also used as the ingestion stage before PySpark Structured Streaming processing.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/python/streams-example/redpanda/consumer.py
- Lines: 1-48
Signature
class RideCSVConsumer:
def __init__(self, props: Dict):
self.consumer = KafkaConsumer(**props)
def consume_from_kafka(self, topics: List[str]):
self.consumer.subscribe(topics=topics)
print('Consuming from Kafka started')
print('Available topics to consume: ', self.consumer.subscription())
while True:
try:
msg = self.consumer.poll(1.0)
if msg is None or msg == {}:
continue
for msg_key, msg_values in msg.items():
for msg_val in msg_values:
print(f'Key:{msg_val.key}-type({type(msg_val.key)}), '
f'Value:{msg_val.value}-type({type(msg_val.value)})')
except KeyboardInterrupt:
break
self.consumer.close()
Import
from consumer import RideCSVConsumer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| props | Dict | Yes | Configuration dictionary passed to KafkaConsumer. Expected keys include bootstrap_servers, auto_offset_reset, enable_auto_commit, key_deserializer, value_deserializer, and group_id. |
| topics | List[str] | Yes | List of Kafka topic names to subscribe to and consume messages from. |
Outputs
| Name | Type | Description |
|---|---|---|
| (stdout) | str | Each consumed message is printed to the console with key, key type, value, and value type. Keys are decoded as integers; values are decoded as UTF-8 strings containing CSV-formatted ride data. |
Usage Examples
Basic Usage
from consumer import RideCSVConsumer
from settings import BOOTSTRAP_SERVERS, CONSUME_TOPIC_RIDES_CSV
config = {
'bootstrap_servers': [BOOTSTRAP_SERVERS],
'auto_offset_reset': 'earliest',
'enable_auto_commit': True,
'key_deserializer': lambda key: int(key.decode('utf-8')),
'value_deserializer': lambda value: value.decode('utf-8'),
'group_id': 'consumer.group.id.csv-example.1',
}
csv_consumer = RideCSVConsumer(props=config)
csv_consumer.consume_from_kafka(topics=[CONSUME_TOPIC_RIDES_CSV])
CLI Usage
# Run from the command line with a custom topic:
# python consumer.py --topic my_custom_topic