Implementation:DataTalksClub Data engineering zoomcamp Java JsonConsumer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
JsonConsumer is a Java Kafka consumer that subscribes to the "rides" topic and deserializes JSON-encoded Ride messages using Confluent's KafkaJsonDeserializer, polling in a loop until no more records are available.
Description
The JsonConsumer class demonstrates the consumer pattern for Apache Kafka with JSON deserialization via the Confluent KafkaJsonDeserializer. On construction, it:
- Configures SASL/SSL authentication properties for Confluent Cloud.
- Sets the key deserializer to StringDeserializer and the value deserializer to KafkaJsonDeserializer.
- Sets the consumer group ID to "kafka_tutorial_example.jsonconsumer.v2" and auto offset reset to "earliest".
- Configures KafkaJsonDeserializerConfig.JSON_VALUE_TYPE to Ride.class so the deserializer knows the target POJO type.
- Creates a KafkaConsumer<String, Ride> and subscribes to the "rides" topic.
The consumeFromKafka() method polls the topic with a 1-second timeout in a do-while loop. For each consumed record, it prints the DOLocationID field. The loop continues while results are non-empty or the iteration counter is below 10, providing a bounded consumption window.
Usage
Use this implementation when you need to consume JSON-serialized messages from a Kafka topic and deserialize them into Java POJOs. This pattern is suitable for simple consumer applications that process messages in a polling loop with bounded iteration.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/java/kafka_examples/src/main/java/org/example/JsonConsumer.java
- Lines: 1-56
Signature
public class JsonConsumer {
private Properties props;
private KafkaConsumer<String, Ride> consumer;
public JsonConsumer()
public void consumeFromKafka()
public static void main(String[] args)
}
Import
import org.example.JsonConsumer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| rides topic | Kafka topic | Yes | Kafka topic named "rides" containing JSON-serialized Ride objects keyed by String. |
| Secrets.KAFKA_CLUSTER_KEY | String | Yes | SASL username credential for Confluent Cloud Kafka cluster authentication. |
| Secrets.KAFKA_CLUSTER_SECRET | String | Yes | SASL password credential for Confluent Cloud Kafka cluster authentication. |
Outputs
| Name | Type | Description |
|---|---|---|
| Console output | stdout | Prints the DOLocationID of each consumed ride record and the count of results per poll iteration. |
Usage Examples
Basic Usage
// Instantiate the consumer (configures properties, creates KafkaConsumer, subscribes to "rides")
JsonConsumer jsonConsumer = new JsonConsumer();
// Start consuming from the "rides" topic in a bounded polling loop
jsonConsumer.consumeFromKafka();
Processing Records Individually
// Within a custom consumer loop, each record is a ConsumerRecord<String, Ride>
for (ConsumerRecord<String, Ride> result : results) {
System.out.println(result.value().DOLocationID);
}