Implementation:DataTalksClub Data engineering zoomcamp Java JsonKStream
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
JsonKStream is a Kafka Streams application that reads JSON-serialized ride events from the "rides" topic, counts them by pickup location key using groupByKey().count(), and produces the running count to the "rides-pulocation-count" output topic.
Description
The JsonKStream class demonstrates a stateful Kafka Streams topology for counting records by key. On construction, it configures:
- SASL/SSL authentication properties for Confluent Cloud.
- Application ID "kafka_tutorial.kstream.count.plocation.v1" for the Streams instance.
- Auto offset reset to "latest" to only process new messages.
- Cache buffering set to 0 bytes to force immediate output of every update (useful for development/debugging).
The createTopology() method builds the Streams topology:
- Reads from the "rides" topic using String key serde and a custom JSON serde for Ride values (via CustomSerdes.getSerde(Ride.class)).
- Groups records by their existing key using groupByKey().
- Applies count() to compute a running count per key.
- Converts the KTable result back to a stream and writes to "rides-pulocation-count" with String keys and Long values.
The countPLocation() method starts the Kafka Streams instance, waits in a loop until the state becomes RUNNING, and registers a shutdown hook to cleanly close the streams.
Usage
Use this implementation when you need a real-time running count of Kafka messages grouped by key. This is a fundamental Kafka Streams pattern for aggregation and is applicable to any use case requiring a count-by-key transformation over a stream.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/java/kafka_examples/src/main/java/org/example/JsonKStream.java
- Lines: 1-56
Signature
public class JsonKStream {
private Properties props;
public JsonKStream()
public Topology createTopology()
public void countPLocation() throws InterruptedException
public static void main(String[] args) throws InterruptedException
}
Import
import org.example.JsonKStream;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| rides topic | Kafka topic | Yes | Input Kafka topic named "rides" containing JSON-serialized Ride objects keyed by pickup location ID (String). |
| Secrets.KAFKA_CLUSTER_KEY | String | Yes | SASL username credential for Confluent Cloud Kafka cluster authentication. |
| Secrets.KAFKA_CLUSTER_SECRET | String | Yes | SASL password credential for Confluent Cloud Kafka cluster authentication. |
Outputs
| Name | Type | Description |
|---|---|---|
| rides-pulocation-count topic | Kafka topic | Output topic containing running counts (Long) per pickup location key (String). |
| Console output | stdout | Prints the Kafka Streams application state during startup. |
Usage Examples
Basic Usage
// Instantiate and start the Kafka Streams count-by-location application
var object = new JsonKStream();
object.countPLocation();
Testing the Topology
// Create the topology for unit testing with TopologyTestDriver
var jsonKStream = new JsonKStream();
Topology topology = jsonKStream.createTopology();
// Use TopologyTestDriver to test without a running Kafka cluster
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);