Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Redpanda JsonProducer

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

Python class JsonProducer that reads CSV taxi ride data, constructs Ride objects, and publishes them as JSON-serialized messages to a Kafka/Redpanda topic using kafka-python's KafkaProducer.

Description

The JsonProducer class (which inherits from KafkaProducer at the class level but internally instantiates its own KafkaProducer instance) provides two core capabilities:

  1. read_records -- A static method that reads a CSV file from the given resource_path, skips the header row, and constructs a list of Ride objects from each row.
  2. publish_rides -- An instance method that iterates over a list of Ride messages and sends each one to the specified Kafka topic. The key is set to the ride's pu_location_id (pickup location ID). On success, the record's offset is printed. KafkaTimeoutError exceptions are caught and printed.

The default configuration in the __main__ block serializes keys by encoding them as UTF-8 strings and values by converting the Ride object's __dict__ to JSON (with default=str to handle Decimal and datetime types). The producer reads from the path defined in settings.INPUT_DATA_PATH and publishes to settings.KAFKA_TOPIC.

Usage

Use this implementation when you need to ingest taxi ride data from a CSV file and publish each ride as a JSON-serialized Kafka message to a Redpanda or Kafka broker. This is the producer counterpart to JsonConsumer, and together they form a complete JSON-based produce/consume pipeline.

Code Reference

Source Location

Signature

class JsonProducer(KafkaProducer):
    def __init__(self, props: Dict):
        self.producer = KafkaProducer(**props)

    @staticmethod
    def read_records(resource_path: str):
        records = []
        with open(resource_path, 'r') as f:
            reader = csv.reader(f)
            header = next(reader)  # skip the header row
            for row in reader:
                records.append(Ride(arr=row))
        return records

    def publish_rides(self, topic: str, messages: List[Ride]):
        for ride in messages:
            try:
                record = self.producer.send(topic=topic, key=ride.pu_location_id, value=ride)
                print('Record {} successfully produced at offset {}'.format(ride.pu_location_id, record.get().offset))
            except KafkaTimeoutError as e:
                print(e.__str__())

Import

from producer import JsonProducer

I/O Contract

Inputs

Name Type Required Description
props Dict Yes Configuration dictionary passed to KafkaProducer. Expected keys include bootstrap_servers, key_serializer, and value_serializer.
resource_path str Yes File system path to a CSV file containing taxi ride data with a header row.
topic str Yes The Kafka topic name to publish ride messages to.
messages List[Ride] Yes A list of Ride objects to be serialized and published.

Outputs

Name Type Description
records List[Ride] The read_records static method returns a list of Ride objects parsed from the CSV file.
(stdout) str Each successfully produced record logs its pickup location ID and offset. Errors are printed to the console.

Usage Examples

Basic Usage

import json
from producer import JsonProducer
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC

config = {
    'bootstrap_servers': BOOTSTRAP_SERVERS,
    'key_serializer': lambda key: str(key).encode(),
    'value_serializer': lambda x: json.dumps(x.__dict__, default=str).encode('utf-8')
}

producer = JsonProducer(props=config)
rides = producer.read_records(resource_path=INPUT_DATA_PATH)
producer.publish_rides(topic=KAFKA_TOPIC, messages=rides)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment