Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Redpanda JsonConsumer

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

Python class JsonConsumer that wraps kafka-python's KafkaConsumer to consume JSON-serialized ride messages from a Redpanda/Kafka topic, deserializing each message into a Ride object via Ride.from_dict.

Description

The JsonConsumer class provides a high-level interface for consuming JSON-encoded taxi ride data from a Kafka-compatible broker (Redpanda). It accepts a configuration dictionary that is passed directly to the underlying KafkaConsumer constructor. The consume_from_kafka method subscribes to one or more topics and enters a polling loop with a 1-second timeout, printing the key and value of each consumed record. The loop handles KeyboardInterrupt gracefully and closes the consumer on exit.

In the default configuration (defined in the __main__ block), messages are deserialized using json.loads with an object_hook that calls Ride.from_dict to reconstruct Ride instances from JSON dictionaries. Keys are decoded as integers (representing pickup location IDs). The consumer group is set to consumer.group.id.json-example.1 with auto_offset_reset set to earliest and auto-commit enabled.

Usage

Use this implementation when you need to consume JSON-serialized taxi ride data from a Redpanda or Kafka topic and reconstruct structured Ride objects on the consumer side. It is suitable for scenarios where both the producer and consumer agree on a JSON serialization format matching the Ride data model.

Code Reference

Source Location

Signature

class JsonConsumer:
    def __init__(self, props: Dict):
        self.consumer = KafkaConsumer(**props)

    def consume_from_kafka(self, topics: List[str]):
        self.consumer.subscribe(topics)
        while True:
            try:
                message = self.consumer.poll(1.0)
                if message is None or message == {}:
                    continue
                for message_key, message_value in message.items():
                    for msg_val in message_value:
                        print(msg_val.key, msg_val.value)
            except KeyboardInterrupt:
                break
        self.consumer.close()

Import

from consumer import JsonConsumer

I/O Contract

Inputs

Name Type Required Description
props Dict Yes Configuration dictionary passed to KafkaConsumer. Expected keys include bootstrap_servers, auto_offset_reset, enable_auto_commit, key_deserializer, value_deserializer, and group_id.
topics List[str] Yes List of Kafka topic names to subscribe to and consume messages from.

Outputs

Name Type Description
(stdout) str Each consumed message's key and value are printed to the console. The value is a deserialized Ride object when using the default JSON deserializer configuration.

Usage Examples

Basic Usage

from json import loads
from kafka import KafkaConsumer
from ride import Ride
from settings import BOOTSTRAP_SERVERS, KAFKA_TOPIC

config = {
    'bootstrap_servers': BOOTSTRAP_SERVERS,
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': True,
    'key_deserializer': lambda key: int(key.decode('utf-8')),
    'value_deserializer': lambda x: loads(x.decode('utf-8'), object_hook=lambda d: Ride.from_dict(d)),
    'group_id': 'consumer.group.id.json-example.1',
}

json_consumer = JsonConsumer(props=config)
json_consumer.consume_from_kafka(topics=[KAFKA_TOPIC])

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment