Implementation:DataTalksClub Data engineering zoomcamp Java JsonProducer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
JsonProducer is a Java Kafka producer that reads taxi ride data from a CSV file, constructs Ride POJO objects, and publishes them as JSON-serialized messages to the "rides" Kafka topic using Confluent's KafkaJsonSerializer.
Description
The JsonProducer class demonstrates the JSON producer pattern for Apache Kafka using Confluent's KafkaJsonSerializer. On construction, it configures:
- SASL/SSL authentication properties for Confluent Cloud.
- acks=all for full acknowledgment from all in-sync replicas.
- StringSerializer for keys and KafkaJsonSerializer for values.
The class provides two main methods:
- getRides() -- Reads a bundled CSV resource file (rides.csv), skips the header row, and maps each row into a Ride object using the CSV-parsing constructor new Ride(arr). Returns a List<Ride>.
- publishRides(List<Ride>) -- Creates a KafkaProducer<String, Ride> and iterates through the rides. Before sending each ride, it overrides the pickup and dropoff timestamps to simulate real-time data (pickup = now minus 20 minutes, dropoff = now). Each ride is sent to the "rides" topic with DOLocationID as the message key, followed by a 500ms sleep. Errors are printed via a callback.
The main method ties these together: it instantiates the producer, loads the rides, and publishes them.
Usage
Use this implementation when you need to produce JSON-serialized messages to a Kafka topic without Schema Registry. This pattern is simpler than Avro serialization and is appropriate when schema evolution guarantees are not required, or during rapid prototyping and development.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/java/kafka_examples/src/main/java/org/example/JsonProducer.java
- Lines: 1-61
Signature
public class JsonProducer {
private Properties props;
public JsonProducer()
public List<Ride> getRides() throws IOException, CsvException
public void publishRides(List<Ride> rides) throws ExecutionException, InterruptedException
public static void main(String[] args) throws IOException, CsvException, ExecutionException, InterruptedException
}
Import
import org.example.JsonProducer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| rides.csv | CSV resource file | Yes | Bundled CSV file containing taxi ride records with 18 columns including VendorID, timestamps, passenger count, trip distance, location IDs, and fare details. |
| Secrets.KAFKA_CLUSTER_KEY | String | Yes | SASL username credential for Confluent Cloud Kafka cluster authentication. |
| Secrets.KAFKA_CLUSTER_SECRET | String | Yes | SASL password credential for Confluent Cloud Kafka cluster authentication. |
Outputs
| Name | Type | Description |
|---|---|---|
| rides topic | Kafka topic with JSON records | JSON-serialized Ride messages keyed by DOLocationID (String). Timestamps are overridden to simulate real-time data. |
| Console output | stdout | Prints the offset of each sent record and the DOLocationID value. |
Usage Examples
Basic Usage
// Instantiate the JSON producer (configures Kafka properties)
var producer = new JsonProducer();
// Read ride records from the bundled CSV resource
var rides = producer.getRides();
// Publish all rides to the "rides" Kafka topic
producer.publishRides(rides);
Timestamp Override Pattern
// The producer overrides timestamps to simulate real-time data:
for (Ride ride : rides) {
ride.tpep_pickup_datetime = LocalDateTime.now().minusMinutes(20);
ride.tpep_dropoff_datetime = LocalDateTime.now();
var record = kafkaProducer.send(
new ProducerRecord<>("rides", String.valueOf(ride.DOLocationID), ride),
(metadata, exception) -> {
if (exception != null) {
System.out.println(exception.getMessage());
}
});
}