Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp JsonProducer Implementation

From Leeroopedia


Page Metadata
Knowledge Sources repo: DataTalksClub/data-engineering-zoomcamp
Domains Data_Engineering, Stream_Processing
Last Updated 2026-02-09 14:00 GMT

Overview

Concrete API documentation for the JsonProducer class, a Python wrapper around kafka-python's KafkaProducer that reads taxi ride CSV data, serializes each record as JSON, and publishes it to a Kafka topic with key-based partitioning by pickup location ID.

Description

The JsonProducer class provides two main capabilities:

  • CSV ingestion: The static method read_records opens a CSV file, skips the header row, and parses each subsequent row into a Ride object using the Ride data model.
  • Message publishing: The publish_rides method iterates over a list of Ride objects and sends each one to the specified Kafka topic. The message key is set to the ride's pu_location_id (pickup location), ensuring that all rides from the same pickup zone land on the same Kafka partition. The producer uses a JSON serializer that converts the Ride object's __dict__ to a UTF-8 encoded JSON string.

The producer is configured with:

  • Key serializer: Converts the integer key to a UTF-8 string via lambda key: str(key).encode().
  • Value serializer: Converts the Ride object to JSON via lambda x: json.dumps(x.__dict__, default=str).encode('utf-8'). The default=str parameter handles Decimal and datetime types that are not natively JSON-serializable.

Usage

Use this implementation to:

  • Ingest batch CSV data into a Kafka topic for downstream real-time processing.
  • Publish taxi ride records with partition affinity by pickup location.
  • Serve as the data source for the JsonConsumer and PySpark streaming pipelines.

Code Reference

Source Location

File 07-streaming/python/json_example/producer.py
Lines L1-43
Repository DataTalksClub/data-engineering-zoomcamp

Signature

class JsonProducer(KafkaProducer):
    def __init__(self, props: Dict) -> None: ...

    @staticmethod
    def read_records(resource_path: str) -> List[Ride]: ...

    def publish_rides(self, topic: str, messages: List[Ride]) -> None: ...

Import

import csv
import json
from typing import List, Dict
from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError
from ride import Ride
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC

I/O Contract

Inputs

Parameter Type Description
props Dict Configuration dictionary passed to KafkaProducer. Must include bootstrap_servers, key_serializer, and value_serializer.
resource_path str File system path to a CSV file containing taxi ride records with a header row
topic str Name of the Kafka topic to publish messages to (e.g., 'rides_json')
messages List[Ride] List of Ride objects to be serialized and sent to the topic

Outputs

Output Type Description
read_records return List[Ride] Parsed list of Ride objects from the CSV file
Kafka messages bytes (key) + bytes (value) Each message is published with key = UTF-8 encoded pu_location_id and value = UTF-8 encoded JSON of the Ride object's attributes
Console log str Success message with the partition key and broker offset for each sent record

Usage Examples

Basic producer setup and message publishing:

import json
from producer import JsonProducer
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC

# Configure the producer with serializers
config = {
    'bootstrap_servers': BOOTSTRAP_SERVERS,  # ['localhost:9092']
    'key_serializer': lambda key: str(key).encode(),
    'value_serializer': lambda x: json.dumps(x.__dict__, default=str).encode('utf-8')
}

# Create producer instance
producer = JsonProducer(props=config)

# Read CSV records into Ride objects
rides = producer.read_records(resource_path=INPUT_DATA_PATH)
print(f"Loaded {len(rides)} ride records")

# Publish all rides to the Kafka topic
producer.publish_rides(topic=KAFKA_TOPIC, messages=rides)
# Output per record:
# Record 142 successfully produced at offset 0
# Record 236 successfully produced at offset 1
# ...

Reading records from a custom CSV path:

from producer import JsonProducer

# Static method can be called without instantiation context
rides = JsonProducer.read_records(resource_path='./data/taxi_rides_january.csv')
print(f"First ride pickup location: {rides[0].pu_location_id}")
print(f"First ride total amount: {rides[0].total_amount}")

Configuration reference for settings.py:

# settings.py
INPUT_DATA_PATH = '../resources/rides.csv'
BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'rides_json'

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment