Implementation:DataTalksClub Data engineering zoomcamp Java JsonKStreamJoins
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
JsonKStreamJoins is a Kafka Streams application that joins ride events from one topic with pickup location events from another topic within a 20-minute time window, filtering out matches where the time difference exceeds 10 minutes, and producing matched VendorInfo results to an output topic.
Description
The JsonKStreamJoins class demonstrates the stream-to-stream join pattern in Kafka Streams. On construction, it configures:
- SASL/SSL authentication properties for Confluent Cloud.
- Application ID "kafka_tutorial.kstream.joined.rides.pickuplocation.v1".
- Auto offset reset to "latest" and cache buffering set to 0 bytes.
The createTopology() method builds a topology that:
- Reads Ride records from the input rides topic (Topics.INPUT_RIDE_TOPIC) using custom JSON serde.
- Reads PickupLocation records from the input pickup location topic (Topics.INPUT_RIDE_LOCATION_TOPIC) using custom JSON serde.
- Re-keys the pickup location stream by PULocationID using selectKey().
- Joins the two streams using a ValueJoiner with JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(20), Duration.ofMinutes(5)):
- The joiner computes the absolute time difference between tpep_dropoff_datetime (ride) and tpep_pickup_datetime (pickup location).
- If the difference exceeds 10 minutes, it returns Optional.empty(); otherwise, it returns a VendorInfo object.
- Filters out empty results and maps the remaining Optional<VendorInfo> values to VendorInfo.
- Writes the joined results to the output topic (Topics.OUTPUT_TOPIC).
The joinRidesPickupLocation() method starts the Kafka Streams instance with an uncaught exception handler that shuts down the application on error, waits for RUNNING state, and registers a shutdown hook.
Usage
Use this implementation when you need to correlate events from two separate Kafka topics based on a time window. This pattern is applicable to scenarios such as matching ride requests with pickup confirmations, joining order events with payment events, or any temporal event correlation.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/java/kafka_examples/src/main/java/org/example/JsonKStreamJoins.java
- Lines: 1-76
Signature
public class JsonKStreamJoins {
private Properties props;
public JsonKStreamJoins()
public Topology createTopology()
public void joinRidesPickupLocation() throws InterruptedException
public static void main(String[] args) throws InterruptedException
}
Import
import org.example.JsonKStreamJoins;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| Topics.INPUT_RIDE_TOPIC | Kafka topic | Yes | Input topic containing JSON-serialized Ride objects keyed by String. |
| Topics.INPUT_RIDE_LOCATION_TOPIC | Kafka topic | Yes | Input topic containing JSON-serialized PickupLocation objects keyed by String. |
| Secrets.KAFKA_CLUSTER_KEY | String | Yes | SASL username credential for Confluent Cloud Kafka cluster authentication. |
| Secrets.KAFKA_CLUSTER_SECRET | String | Yes | SASL password credential for Confluent Cloud Kafka cluster authentication. |
Outputs
| Name | Type | Description |
|---|---|---|
| Topics.OUTPUT_TOPIC | Kafka topic | Output topic containing JSON-serialized VendorInfo objects representing successfully joined ride and pickup location events (within 10-minute proximity). |
| Console output | stdout | Prints the Kafka Streams application state and any uncaught exception messages. |
Usage Examples
Basic Usage
// Instantiate and start the stream join application
var object = new JsonKStreamJoins();
object.joinRidesPickupLocation();
Join Logic Detail
// The join uses a 20-minute time window with 5-minute grace period
// Records are matched by key, then filtered by a 10-minute proximity check
var joined = rides.join(
pickupLocationsKeyedOnPUId,
(ride, pickupLocation) -> {
var period = Duration.between(ride.tpep_dropoff_datetime, pickupLocation.tpep_pickup_datetime);
if (period.abs().toMinutes() > 10) return Optional.empty();
else return Optional.of(new VendorInfo(
ride.VendorID,
pickupLocation.PULocationID,
pickupLocation.tpep_pickup_datetime,
ride.tpep_dropoff_datetime));
},
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(20), Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), CustomSerdes.getSerde(Ride.class), CustomSerdes.getSerde(PickupLocation.class))
);