Implementation:DataTalksClub Data engineering zoomcamp JsonConsumer Implementation
Appearance
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp |
| Domains | Data_Engineering, Stream_Processing |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete API documentation for the JsonConsumer class, a Python wrapper around kafka-python's KafkaConsumer that subscribes to Kafka topics, deserializes JSON messages back into Ride objects, and prints each record's key and value in a continuous poll loop.
Description
The JsonConsumer class provides a simple, blocking consumer that:
- Subscribes to one or more Kafka topics by name.
- Polls the broker with a 1-second timeout in a continuous loop, which allows the consumer to handle
KeyboardInterruptsignals between polls for graceful shutdown. - Deserializes incoming messages using configured deserializers:
- Key deserializer: Decodes the byte key to UTF-8 and parses it as an integer, matching the producer's key serializer.
- Value deserializer: Decodes the byte value to UTF-8, parses the JSON string, and uses
object_hookwithRide.from_dictto reconstruct aRidedomain object.
- Processes messages by iterating over the partition-keyed message dictionary returned by
poll()and printing each message's key and deserialized value. - Closes the consumer cleanly on interrupt, committing final offsets and releasing partition assignments.
The consumer is configured with:
auto_offset_reset='earliest': New consumer groups start reading from the beginning of the topic.enable_auto_commit=True: Offsets are committed periodically in the background.group_id='consumer.group.id.json-example.1': Identifies this consumer as part of a named group for partition assignment and offset tracking.
Usage
Use this implementation to:
- Read and deserialize taxi ride messages produced by the
JsonProducer. - Verify that the end-to-end produce-consume pipeline is working correctly.
- Serve as a template for building more sophisticated consumer applications with custom message processing logic.
Code Reference
Source Location
| File | 07-streaming/python/json_example/consumer.py
|
| Lines | L1-43 |
| Repository | DataTalksClub/data-engineering-zoomcamp |
Signature
class JsonConsumer:
def __init__(self, props: Dict) -> None: ...
def consume_from_kafka(self, topics: List[str]) -> None: ...
Import
from typing import Dict, List
from json import loads
from kafka import KafkaConsumer
from ride import Ride
from settings import BOOTSTRAP_SERVERS, KAFKA_TOPIC
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| props | Dict |
Configuration dictionary passed to KafkaConsumer. Must include bootstrap_servers, auto_offset_reset, enable_auto_commit, key_deserializer, value_deserializer, and group_id.
|
| topics | List[str] |
List of Kafka topic names to subscribe to (e.g., ['rides_json'])
|
Outputs
| Output | Type | Description |
|---|---|---|
| Console output (key) | int |
The deserialized message key (pickup location ID) |
| Console output (value) | Ride |
The deserialized Ride object reconstructed from the JSON message
|
| Offset commits | Broker-side | Periodic automatic offset commits to the __consumer_offsets internal topic
|
Usage Examples
Basic consumer setup and message consumption:
from json import loads
from consumer import JsonConsumer
from ride import Ride
from settings import BOOTSTRAP_SERVERS, KAFKA_TOPIC
# Configure the consumer with deserializers
config = {
'bootstrap_servers': BOOTSTRAP_SERVERS, # ['localhost:9092']
'auto_offset_reset': 'earliest',
'enable_auto_commit': True,
'key_deserializer': lambda key: int(key.decode('utf-8')),
'value_deserializer': lambda x: loads(
x.decode('utf-8'),
object_hook=lambda d: Ride.from_dict(d)
),
'group_id': 'consumer.group.id.json-example.1',
}
# Create consumer and start consuming
json_consumer = JsonConsumer(props=config)
json_consumer.consume_from_kafka(topics=[KAFKA_TOPIC])
# Output:
# Consuming from Kafka started
# Available topics to consume: {'rides_json'}
# 142 Ride: {'vendor_id': '1', 'tpep_pickup_datetime': ..., ...}
# 236 Ride: {'vendor_id': '2', 'tpep_pickup_datetime': ..., ...}
# ...
# (Press Ctrl+C to stop)
Consuming from multiple topics:
from consumer import JsonConsumer
config = {
'bootstrap_servers': ['localhost:9092'],
'auto_offset_reset': 'earliest',
'enable_auto_commit': True,
'key_deserializer': lambda key: int(key.decode('utf-8')),
'value_deserializer': lambda x: loads(x.decode('utf-8')),
'group_id': 'multi-topic-consumer-group',
}
consumer = JsonConsumer(props=config)
consumer.consume_from_kafka(topics=['rides_json', 'rides_avro'])
Consumer group offset behavior:
# First run with group_id='my-group':
# auto_offset_reset='earliest' -> reads all messages from offset 0
#
# Second run with the same group_id='my-group':
# Committed offsets exist -> resumes from the last committed offset
# Only new messages (produced after the first run) are consumed
#
# Run with a NEW group_id='my-new-group':
# No committed offsets -> falls back to auto_offset_reset='earliest'
# Reads all messages from the beginning again
Related Pages
- Principle:DataTalksClub_Data_engineering_zoomcamp_Kafka_Consumer_Pattern
- Implementation:DataTalksClub_Data_engineering_zoomcamp_Ride_Data_Model
- Implementation:DataTalksClub_Data_engineering_zoomcamp_Kafka_Docker_Compose_Setup
- Implementation:DataTalksClub_Data_engineering_zoomcamp_JsonProducer_Implementation
- Heuristic:DataTalksClub_Data_engineering_zoomcamp_Kafka_Consumer_Poll_Timeout
- Environment:DataTalksClub_Data_engineering_zoomcamp_Kafka_Confluent_Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment