Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Java JsonKStreamWindow

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

JsonKStreamWindow is a Kafka Streams application that performs windowed aggregation on ride events, counting rides by pickup location within 10-second tumbling windows and producing the windowed counts to an output topic.

Description

The JsonKStreamWindow class demonstrates the windowed aggregation pattern in Kafka Streams using tumbling time windows. On construction, it configures:

  • SASL/SSL authentication properties for Confluent Cloud.
  • Application ID "kafka_tutorial.kstream.count.plocation.v1".
  • Auto offset reset to "latest" and cache buffering set to 0 bytes.

The createTopology() method builds a topology that:

  1. Reads Ride records from the "rides" topic using String key serde and a custom JSON serde for Ride values.
  2. Groups records by key using groupByKey().
  3. Applies windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(10), Duration.ofSeconds(5))) to create 10-second tumbling windows with a 5-second grace period for late arrivals.
  4. Applies count() within each window to compute the per-key, per-window count.
  5. Converts the windowed KTable result back to a stream.
  6. Creates a WindowedSerdes.timeWindowedSerdeFrom(String.class, 10*1000) for serializing the windowed key.
  7. Writes results to the "rides-pulocation-window-count" topic with the windowed serde for keys and Long serde for count values.

The countPLocationWindowed() method starts the Kafka Streams instance and registers a shutdown hook for clean termination.

Usage

Use this implementation when you need time-bounded aggregation of streaming events. Tumbling windows are non-overlapping, fixed-size time intervals that are useful for computing metrics like "number of events per 10-second interval." This pattern is applicable to real-time dashboards, alerting systems, and any time-series aggregation use case.

Code Reference

Source Location

Signature

public class JsonKStreamWindow {
    private Properties props;

    public JsonKStreamWindow()
    public Topology createTopology()
    public void countPLocationWindowed()
    public static void main(String[] args)
}

Import

import org.example.JsonKStreamWindow;

I/O Contract

Inputs

Name Type Required Description
rides topic Kafka topic Yes Input Kafka topic named "rides" containing JSON-serialized Ride objects keyed by pickup location ID (String).
Secrets.KAFKA_CLUSTER_KEY String Yes SASL username credential for Confluent Cloud Kafka cluster authentication.
Secrets.KAFKA_CLUSTER_SECRET String Yes SASL password credential for Confluent Cloud Kafka cluster authentication.

Outputs

Name Type Description
rides-pulocation-window-count topic Kafka topic Output topic containing windowed counts. Keys are Windowed<String> (pickup location ID with time window metadata), values are Long counts.
Window size 10 seconds Each tumbling window covers a 10-second non-overlapping time interval.
Grace period 5 seconds Late-arriving records within 5 seconds after the window closes are still included.

Usage Examples

Basic Usage

// Instantiate and start the windowed count application
var object = new JsonKStreamWindow();
object.countPLocationWindowed();

Windowed Aggregation Pattern

// The core windowed aggregation pattern:
// 1. Group by key
// 2. Apply tumbling time window
// 3. Count within each window
var puLocationCount = ridesStream.groupByKey()
        .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(10), Duration.ofSeconds(5)))
        .count()
        .toStream();

// Serialize windowed keys with WindowedSerdes
var windowSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 10 * 1000);
puLocationCount.to("rides-pulocation-window-count", Produced.with(windowSerde, Serdes.Long()));

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment