Implementation:DataTalksClub Data engineering zoomcamp JsonProducer Implementation
Appearance
| Page Metadata | |
|---|---|
| Knowledge Sources | repo: DataTalksClub/data-engineering-zoomcamp |
| Domains | Data_Engineering, Stream_Processing |
| Last Updated | 2026-02-09 14:00 GMT |
Overview
Concrete API documentation for the JsonProducer class, a Python wrapper around kafka-python's KafkaProducer that reads taxi ride CSV data, serializes each record as JSON, and publishes it to a Kafka topic with key-based partitioning by pickup location ID.
Description
The JsonProducer class provides two main capabilities:
- CSV ingestion: The static method
read_recordsopens a CSV file, skips the header row, and parses each subsequent row into aRideobject using theRidedata model. - Message publishing: The
publish_ridesmethod iterates over a list ofRideobjects and sends each one to the specified Kafka topic. The message key is set to the ride'spu_location_id(pickup location), ensuring that all rides from the same pickup zone land on the same Kafka partition. The producer uses a JSON serializer that converts theRideobject's__dict__to a UTF-8 encoded JSON string.
The producer is configured with:
- Key serializer: Converts the integer key to a UTF-8 string via
lambda key: str(key).encode(). - Value serializer: Converts the
Rideobject to JSON vialambda x: json.dumps(x.__dict__, default=str).encode('utf-8'). Thedefault=strparameter handlesDecimalanddatetimetypes that are not natively JSON-serializable.
Usage
Use this implementation to:
- Ingest batch CSV data into a Kafka topic for downstream real-time processing.
- Publish taxi ride records with partition affinity by pickup location.
- Serve as the data source for the
JsonConsumerand PySpark streaming pipelines.
Code Reference
Source Location
| File | 07-streaming/python/json_example/producer.py
|
| Lines | L1-43 |
| Repository | DataTalksClub/data-engineering-zoomcamp |
Signature
class JsonProducer(KafkaProducer):
def __init__(self, props: Dict) -> None: ...
@staticmethod
def read_records(resource_path: str) -> List[Ride]: ...
def publish_rides(self, topic: str, messages: List[Ride]) -> None: ...
Import
import csv
import json
from typing import List, Dict
from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError
from ride import Ride
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| props | Dict |
Configuration dictionary passed to KafkaProducer. Must include bootstrap_servers, key_serializer, and value_serializer.
|
| resource_path | str |
File system path to a CSV file containing taxi ride records with a header row |
| topic | str |
Name of the Kafka topic to publish messages to (e.g., 'rides_json')
|
| messages | List[Ride] |
List of Ride objects to be serialized and sent to the topic
|
Outputs
| Output | Type | Description |
|---|---|---|
| read_records return | List[Ride] |
Parsed list of Ride objects from the CSV file
|
| Kafka messages | bytes (key) + bytes (value) | Each message is published with key = UTF-8 encoded pu_location_id and value = UTF-8 encoded JSON of the Ride object's attributes
|
| Console log | str |
Success message with the partition key and broker offset for each sent record |
Usage Examples
Basic producer setup and message publishing:
import json
from producer import JsonProducer
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC
# Configure the producer with serializers
config = {
'bootstrap_servers': BOOTSTRAP_SERVERS, # ['localhost:9092']
'key_serializer': lambda key: str(key).encode(),
'value_serializer': lambda x: json.dumps(x.__dict__, default=str).encode('utf-8')
}
# Create producer instance
producer = JsonProducer(props=config)
# Read CSV records into Ride objects
rides = producer.read_records(resource_path=INPUT_DATA_PATH)
print(f"Loaded {len(rides)} ride records")
# Publish all rides to the Kafka topic
producer.publish_rides(topic=KAFKA_TOPIC, messages=rides)
# Output per record:
# Record 142 successfully produced at offset 0
# Record 236 successfully produced at offset 1
# ...
Reading records from a custom CSV path:
from producer import JsonProducer
# Static method can be called without instantiation context
rides = JsonProducer.read_records(resource_path='./data/taxi_rides_january.csv')
print(f"First ride pickup location: {rides[0].pu_location_id}")
print(f"First ride total amount: {rides[0].total_amount}")
Configuration reference for settings.py:
# settings.py
INPUT_DATA_PATH = '../resources/rides.csv'
BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'rides_json'
Related Pages
- Principle:DataTalksClub_Data_engineering_zoomcamp_Kafka_Producer_Pattern
- Implementation:DataTalksClub_Data_engineering_zoomcamp_Ride_Data_Model
- Implementation:DataTalksClub_Data_engineering_zoomcamp_Kafka_Docker_Compose_Setup
- Implementation:DataTalksClub_Data_engineering_zoomcamp_JsonConsumer_Implementation
- Environment:DataTalksClub_Data_engineering_zoomcamp_Kafka_Confluent_Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment