Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Java JsonProducer

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

JsonProducer is a Java Kafka producer that reads taxi ride data from a CSV file, constructs Ride POJO objects, and publishes them as JSON-serialized messages to the "rides" Kafka topic using Confluent's KafkaJsonSerializer.

Description

The JsonProducer class demonstrates the JSON producer pattern for Apache Kafka using Confluent's KafkaJsonSerializer. On construction, it configures:

  • SASL/SSL authentication properties for Confluent Cloud.
  • acks=all for full acknowledgment from all in-sync replicas.
  • StringSerializer for keys and KafkaJsonSerializer for values.

The class provides two main methods:

  • getRides() -- Reads a bundled CSV resource file (rides.csv), skips the header row, and maps each row into a Ride object using the CSV-parsing constructor new Ride(arr). Returns a List<Ride>.
  • publishRides(List<Ride>) -- Creates a KafkaProducer<String, Ride> and iterates through the rides. Before sending each ride, it overrides the pickup and dropoff timestamps to simulate real-time data (pickup = now minus 20 minutes, dropoff = now). Each ride is sent to the "rides" topic with DOLocationID as the message key, followed by a 500ms sleep. Errors are printed via a callback.

The main method ties these together: it instantiates the producer, loads the rides, and publishes them.

Usage

Use this implementation when you need to produce JSON-serialized messages to a Kafka topic without Schema Registry. This pattern is simpler than Avro serialization and is appropriate when schema evolution guarantees are not required, or during rapid prototyping and development.

Code Reference

Source Location

Signature

public class JsonProducer {
    private Properties props;

    public JsonProducer()
    public List<Ride> getRides() throws IOException, CsvException
    public void publishRides(List<Ride> rides) throws ExecutionException, InterruptedException
    public static void main(String[] args) throws IOException, CsvException, ExecutionException, InterruptedException
}

Import

import org.example.JsonProducer;

I/O Contract

Inputs

Name Type Required Description
rides.csv CSV resource file Yes Bundled CSV file containing taxi ride records with 18 columns including VendorID, timestamps, passenger count, trip distance, location IDs, and fare details.
Secrets.KAFKA_CLUSTER_KEY String Yes SASL username credential for Confluent Cloud Kafka cluster authentication.
Secrets.KAFKA_CLUSTER_SECRET String Yes SASL password credential for Confluent Cloud Kafka cluster authentication.

Outputs

Name Type Description
rides topic Kafka topic with JSON records JSON-serialized Ride messages keyed by DOLocationID (String). Timestamps are overridden to simulate real-time data.
Console output stdout Prints the offset of each sent record and the DOLocationID value.

Usage Examples

Basic Usage

// Instantiate the JSON producer (configures Kafka properties)
var producer = new JsonProducer();

// Read ride records from the bundled CSV resource
var rides = producer.getRides();

// Publish all rides to the "rides" Kafka topic
producer.publishRides(rides);

Timestamp Override Pattern

// The producer overrides timestamps to simulate real-time data:
for (Ride ride : rides) {
    ride.tpep_pickup_datetime = LocalDateTime.now().minusMinutes(20);
    ride.tpep_dropoff_datetime = LocalDateTime.now();
    var record = kafkaProducer.send(
        new ProducerRecord<>("rides", String.valueOf(ride.DOLocationID), ride),
        (metadata, exception) -> {
            if (exception != null) {
                System.out.println(exception.getMessage());
            }
        });
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment