Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:DataTalksClub Data engineering zoomcamp Java JsonConsumer

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

JsonConsumer is a Java Kafka consumer that subscribes to the "rides" topic and deserializes JSON-encoded Ride messages using Confluent's KafkaJsonDeserializer, polling in a loop until no more records are available.

Description

The JsonConsumer class demonstrates the consumer pattern for Apache Kafka with JSON deserialization via the Confluent KafkaJsonDeserializer. On construction, it:

  • Configures SASL/SSL authentication properties for Confluent Cloud.
  • Sets the key deserializer to StringDeserializer and the value deserializer to KafkaJsonDeserializer.
  • Sets the consumer group ID to "kafka_tutorial_example.jsonconsumer.v2" and auto offset reset to "earliest".
  • Configures KafkaJsonDeserializerConfig.JSON_VALUE_TYPE to Ride.class so the deserializer knows the target POJO type.
  • Creates a KafkaConsumer<String, Ride> and subscribes to the "rides" topic.

The consumeFromKafka() method polls the topic with a 1-second timeout in a do-while loop. For each consumed record, it prints the DOLocationID field. The loop continues while results are non-empty or the iteration counter is below 10, providing a bounded consumption window.

Usage

Use this implementation when you need to consume JSON-serialized messages from a Kafka topic and deserialize them into Java POJOs. This pattern is suitable for simple consumer applications that process messages in a polling loop with bounded iteration.

Code Reference

Source Location

Signature

public class JsonConsumer {
    private Properties props;
    private KafkaConsumer<String, Ride> consumer;

    public JsonConsumer()
    public void consumeFromKafka()
    public static void main(String[] args)
}

Import

import org.example.JsonConsumer;

I/O Contract

Inputs

Name Type Required Description
rides topic Kafka topic Yes Kafka topic named "rides" containing JSON-serialized Ride objects keyed by String.
Secrets.KAFKA_CLUSTER_KEY String Yes SASL username credential for Confluent Cloud Kafka cluster authentication.
Secrets.KAFKA_CLUSTER_SECRET String Yes SASL password credential for Confluent Cloud Kafka cluster authentication.

Outputs

Name Type Description
Console output stdout Prints the DOLocationID of each consumed ride record and the count of results per poll iteration.

Usage Examples

Basic Usage

// Instantiate the consumer (configures properties, creates KafkaConsumer, subscribes to "rides")
JsonConsumer jsonConsumer = new JsonConsumer();

// Start consuming from the "rides" topic in a bounded polling loop
jsonConsumer.consumeFromKafka();

Processing Records Individually

// Within a custom consumer loop, each record is a ConsumerRecord<String, Ride>
for (ConsumerRecord<String, Ride> result : results) {
    System.out.println(result.value().DOLocationID);
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment