Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Java JsonKStream

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

JsonKStream is a Kafka Streams application that reads JSON-serialized ride events from the "rides" topic, counts them by pickup location key using groupByKey().count(), and produces the running count to the "rides-pulocation-count" output topic.

Description

The JsonKStream class demonstrates a stateful Kafka Streams topology for counting records by key. On construction, it configures:

  • SASL/SSL authentication properties for Confluent Cloud.
  • Application ID "kafka_tutorial.kstream.count.plocation.v1" for the Streams instance.
  • Auto offset reset to "latest" to only process new messages.
  • Cache buffering set to 0 bytes to force immediate output of every update (useful for development/debugging).

The createTopology() method builds the Streams topology:

  1. Reads from the "rides" topic using String key serde and a custom JSON serde for Ride values (via CustomSerdes.getSerde(Ride.class)).
  2. Groups records by their existing key using groupByKey().
  3. Applies count() to compute a running count per key.
  4. Converts the KTable result back to a stream and writes to "rides-pulocation-count" with String keys and Long values.

The countPLocation() method starts the Kafka Streams instance, waits in a loop until the state becomes RUNNING, and registers a shutdown hook to cleanly close the streams.

Usage

Use this implementation when you need a real-time running count of Kafka messages grouped by key. This is a fundamental Kafka Streams pattern for aggregation and is applicable to any use case requiring a count-by-key transformation over a stream.

Code Reference

Source Location

Signature

public class JsonKStream {
    private Properties props;

    public JsonKStream()
    public Topology createTopology()
    public void countPLocation() throws InterruptedException
    public static void main(String[] args) throws InterruptedException
}

Import

import org.example.JsonKStream;

I/O Contract

Inputs

Name Type Required Description
rides topic Kafka topic Yes Input Kafka topic named "rides" containing JSON-serialized Ride objects keyed by pickup location ID (String).
Secrets.KAFKA_CLUSTER_KEY String Yes SASL username credential for Confluent Cloud Kafka cluster authentication.
Secrets.KAFKA_CLUSTER_SECRET String Yes SASL password credential for Confluent Cloud Kafka cluster authentication.

Outputs

Name Type Description
rides-pulocation-count topic Kafka topic Output topic containing running counts (Long) per pickup location key (String).
Console output stdout Prints the Kafka Streams application state during startup.

Usage Examples

Basic Usage

// Instantiate and start the Kafka Streams count-by-location application
var object = new JsonKStream();
object.countPLocation();

Testing the Topology

// Create the topology for unit testing with TopologyTestDriver
var jsonKStream = new JsonKStream();
Topology topology = jsonKStream.createTopology();

// Use TopologyTestDriver to test without a running Kafka cluster
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment