Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp JsonConsumer Implementation

From Leeroopedia


Page Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp
Domains Data_Engineering, Stream_Processing
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete API documentation for the JsonConsumer class, a Python wrapper around kafka-python's KafkaConsumer that subscribes to Kafka topics, deserializes JSON messages back into Ride objects, and prints each record's key and value in a continuous poll loop.

Description

The JsonConsumer class provides a simple, blocking consumer that:

  • Subscribes to one or more Kafka topics by name.
  • Polls the broker with a 1-second timeout in a continuous loop, which allows the consumer to handle KeyboardInterrupt signals between polls for graceful shutdown.
  • Deserializes incoming messages using configured deserializers:
    • Key deserializer: Decodes the byte key to UTF-8 and parses it as an integer, matching the producer's key serializer.
    • Value deserializer: Decodes the byte value to UTF-8, parses the JSON string, and uses object_hook with Ride.from_dict to reconstruct a Ride domain object.
  • Processes messages by iterating over the partition-keyed message dictionary returned by poll() and printing each message's key and deserialized value.
  • Closes the consumer cleanly on interrupt, committing final offsets and releasing partition assignments.

The consumer is configured with:

  • auto_offset_reset='earliest': New consumer groups start reading from the beginning of the topic.
  • enable_auto_commit=True: Offsets are committed periodically in the background.
  • group_id='consumer.group.id.json-example.1': Identifies this consumer as part of a named group for partition assignment and offset tracking.

Usage

Use this implementation to:

  • Read and deserialize taxi ride messages produced by the JsonProducer.
  • Verify that the end-to-end produce-consume pipeline is working correctly.
  • Serve as a template for building more sophisticated consumer applications with custom message processing logic.

Code Reference

Source Location

File 07-streaming/python/json_example/consumer.py
Lines L1-43
Repository DataTalksClub/data-engineering-zoomcamp

Signature

class JsonConsumer:
    def __init__(self, props: Dict) -> None: ...
    def consume_from_kafka(self, topics: List[str]) -> None: ...

Import

from typing import Dict, List
from json import loads
from kafka import KafkaConsumer
from ride import Ride
from settings import BOOTSTRAP_SERVERS, KAFKA_TOPIC

I/O Contract

Inputs

Parameter Type Description
props Dict Configuration dictionary passed to KafkaConsumer. Must include bootstrap_servers, auto_offset_reset, enable_auto_commit, key_deserializer, value_deserializer, and group_id.
topics List[str] List of Kafka topic names to subscribe to (e.g., ['rides_json'])

Outputs

Output Type Description
Console output (key) int The deserialized message key (pickup location ID)
Console output (value) Ride The deserialized Ride object reconstructed from the JSON message
Offset commits Broker-side Periodic automatic offset commits to the __consumer_offsets internal topic

Usage Examples

Basic consumer setup and message consumption:

from json import loads
from consumer import JsonConsumer
from ride import Ride
from settings import BOOTSTRAP_SERVERS, KAFKA_TOPIC

# Configure the consumer with deserializers
config = {
    'bootstrap_servers': BOOTSTRAP_SERVERS,  # ['localhost:9092']
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': True,
    'key_deserializer': lambda key: int(key.decode('utf-8')),
    'value_deserializer': lambda x: loads(
        x.decode('utf-8'),
        object_hook=lambda d: Ride.from_dict(d)
    ),
    'group_id': 'consumer.group.id.json-example.1',
}

# Create consumer and start consuming
json_consumer = JsonConsumer(props=config)
json_consumer.consume_from_kafka(topics=[KAFKA_TOPIC])
# Output:
# Consuming from Kafka started
# Available topics to consume:  {'rides_json'}
# 142 Ride: {'vendor_id': '1', 'tpep_pickup_datetime': ..., ...}
# 236 Ride: {'vendor_id': '2', 'tpep_pickup_datetime': ..., ...}
# ...
# (Press Ctrl+C to stop)

Consuming from multiple topics:

from consumer import JsonConsumer

config = {
    'bootstrap_servers': ['localhost:9092'],
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': True,
    'key_deserializer': lambda key: int(key.decode('utf-8')),
    'value_deserializer': lambda x: loads(x.decode('utf-8')),
    'group_id': 'multi-topic-consumer-group',
}

consumer = JsonConsumer(props=config)
consumer.consume_from_kafka(topics=['rides_json', 'rides_avro'])

Consumer group offset behavior:

# First run with group_id='my-group':
#   auto_offset_reset='earliest' -> reads all messages from offset 0
#
# Second run with the same group_id='my-group':
#   Committed offsets exist -> resumes from the last committed offset
#   Only new messages (produced after the first run) are consumed
#
# Run with a NEW group_id='my-new-group':
#   No committed offsets -> falls back to auto_offset_reset='earliest'
#   Reads all messages from the beginning again

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment