Implementation:DataTalksClub Data engineering zoomcamp Redpanda JsonConsumer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Python class JsonConsumer that wraps kafka-python's KafkaConsumer to consume JSON-serialized ride messages from a Redpanda/Kafka topic, deserializing each message into a Ride object via Ride.from_dict.
Description
The JsonConsumer class provides a high-level interface for consuming JSON-encoded taxi ride data from a Kafka-compatible broker (Redpanda). It accepts a configuration dictionary that is passed directly to the underlying KafkaConsumer constructor. The consume_from_kafka method subscribes to one or more topics and enters a polling loop with a 1-second timeout, printing the key and value of each consumed record. The loop handles KeyboardInterrupt gracefully and closes the consumer on exit.
In the default configuration (defined in the __main__ block), messages are deserialized using json.loads with an object_hook that calls Ride.from_dict to reconstruct Ride instances from JSON dictionaries. Keys are decoded as integers (representing pickup location IDs). The consumer group is set to consumer.group.id.json-example.1 with auto_offset_reset set to earliest and auto-commit enabled.
Usage
Use this implementation when you need to consume JSON-serialized taxi ride data from a Redpanda or Kafka topic and reconstruct structured Ride objects on the consumer side. It is suitable for scenarios where both the producer and consumer agree on a JSON serialization format matching the Ride data model.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/python/redpanda_example/consumer.py
- Lines: 1-48
Signature
class JsonConsumer:
def __init__(self, props: Dict):
self.consumer = KafkaConsumer(**props)
def consume_from_kafka(self, topics: List[str]):
self.consumer.subscribe(topics)
while True:
try:
message = self.consumer.poll(1.0)
if message is None or message == {}:
continue
for message_key, message_value in message.items():
for msg_val in message_value:
print(msg_val.key, msg_val.value)
except KeyboardInterrupt:
break
self.consumer.close()
Import
from consumer import JsonConsumer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| props | Dict | Yes | Configuration dictionary passed to KafkaConsumer. Expected keys include bootstrap_servers, auto_offset_reset, enable_auto_commit, key_deserializer, value_deserializer, and group_id. |
| topics | List[str] | Yes | List of Kafka topic names to subscribe to and consume messages from. |
Outputs
| Name | Type | Description |
|---|---|---|
| (stdout) | str | Each consumed message's key and value are printed to the console. The value is a deserialized Ride object when using the default JSON deserializer configuration. |
Usage Examples
Basic Usage
from json import loads
from kafka import KafkaConsumer
from ride import Ride
from settings import BOOTSTRAP_SERVERS, KAFKA_TOPIC
config = {
'bootstrap_servers': BOOTSTRAP_SERVERS,
'auto_offset_reset': 'earliest',
'enable_auto_commit': True,
'key_deserializer': lambda key: int(key.decode('utf-8')),
'value_deserializer': lambda x: loads(x.decode('utf-8'), object_hook=lambda d: Ride.from_dict(d)),
'group_id': 'consumer.group.id.json-example.1',
}
json_consumer = JsonConsumer(props=config)
json_consumer.consume_from_kafka(topics=[KAFKA_TOPIC])