Implementation:DataTalksClub Data engineering zoomcamp Java JsonKStreamWindow
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
JsonKStreamWindow is a Kafka Streams application that performs windowed aggregation on ride events, counting rides by pickup location within 10-second tumbling windows and producing the windowed counts to an output topic.
Description
The JsonKStreamWindow class demonstrates the windowed aggregation pattern in Kafka Streams using tumbling time windows. On construction, it configures:
- SASL/SSL authentication properties for Confluent Cloud.
- Application ID "kafka_tutorial.kstream.count.plocation.v1".
- Auto offset reset to "latest" and cache buffering set to 0 bytes.
The createTopology() method builds a topology that:
- Reads Ride records from the "rides" topic using String key serde and a custom JSON serde for Ride values.
- Groups records by key using groupByKey().
- Applies windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(10), Duration.ofSeconds(5))) to create 10-second tumbling windows with a 5-second grace period for late arrivals.
- Applies count() within each window to compute the per-key, per-window count.
- Converts the windowed KTable result back to a stream.
- Creates a WindowedSerdes.timeWindowedSerdeFrom(String.class, 10*1000) for serializing the windowed key.
- Writes results to the "rides-pulocation-window-count" topic with the windowed serde for keys and Long serde for count values.
The countPLocationWindowed() method starts the Kafka Streams instance and registers a shutdown hook for clean termination.
Usage
Use this implementation when you need time-bounded aggregation of streaming events. Tumbling windows are non-overlapping, fixed-size time intervals that are useful for computing metrics like "number of events per 10-second interval." This pattern is applicable to real-time dashboards, alerting systems, and any time-series aggregation use case.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/java/kafka_examples/src/main/java/org/example/JsonKStreamWindow.java
- Lines: 1-60
Signature
public class JsonKStreamWindow {
private Properties props;
public JsonKStreamWindow()
public Topology createTopology()
public void countPLocationWindowed()
public static void main(String[] args)
}
Import
import org.example.JsonKStreamWindow;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| rides topic | Kafka topic | Yes | Input Kafka topic named "rides" containing JSON-serialized Ride objects keyed by pickup location ID (String). |
| Secrets.KAFKA_CLUSTER_KEY | String | Yes | SASL username credential for Confluent Cloud Kafka cluster authentication. |
| Secrets.KAFKA_CLUSTER_SECRET | String | Yes | SASL password credential for Confluent Cloud Kafka cluster authentication. |
Outputs
| Name | Type | Description |
|---|---|---|
| rides-pulocation-window-count topic | Kafka topic | Output topic containing windowed counts. Keys are Windowed<String> (pickup location ID with time window metadata), values are Long counts. |
| Window size | 10 seconds | Each tumbling window covers a 10-second non-overlapping time interval. |
| Grace period | 5 seconds | Late-arriving records within 5 seconds after the window closes are still included. |
Usage Examples
Basic Usage
// Instantiate and start the windowed count application
var object = new JsonKStreamWindow();
object.countPLocationWindowed();
Windowed Aggregation Pattern
// The core windowed aggregation pattern:
// 1. Group by key
// 2. Apply tumbling time window
// 3. Count within each window
var puLocationCount = ridesStream.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(10), Duration.ofSeconds(5)))
.count()
.toStream();
// Serialize windowed keys with WindowedSerdes
var windowSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 10 * 1000);
puLocationCount.to("rides-pulocation-window-count", Produced.with(windowSerde, Serdes.Long()));