Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Java JsonKStreamJoins

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

JsonKStreamJoins is a Kafka Streams application that joins ride events from one topic with pickup location events from another topic within a 20-minute time window, filtering out matches where the time difference exceeds 10 minutes, and producing matched VendorInfo results to an output topic.

Description

The JsonKStreamJoins class demonstrates the stream-to-stream join pattern in Kafka Streams. On construction, it configures:

  • SASL/SSL authentication properties for Confluent Cloud.
  • Application ID "kafka_tutorial.kstream.joined.rides.pickuplocation.v1".
  • Auto offset reset to "latest" and cache buffering set to 0 bytes.

The createTopology() method builds a topology that:

  1. Reads Ride records from the input rides topic (Topics.INPUT_RIDE_TOPIC) using custom JSON serde.
  2. Reads PickupLocation records from the input pickup location topic (Topics.INPUT_RIDE_LOCATION_TOPIC) using custom JSON serde.
  3. Re-keys the pickup location stream by PULocationID using selectKey().
  4. Joins the two streams using a ValueJoiner with JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(20), Duration.ofMinutes(5)):
    • The joiner computes the absolute time difference between tpep_dropoff_datetime (ride) and tpep_pickup_datetime (pickup location).
    • If the difference exceeds 10 minutes, it returns Optional.empty(); otherwise, it returns a VendorInfo object.
  5. Filters out empty results and maps the remaining Optional<VendorInfo> values to VendorInfo.
  6. Writes the joined results to the output topic (Topics.OUTPUT_TOPIC).

The joinRidesPickupLocation() method starts the Kafka Streams instance with an uncaught exception handler that shuts down the application on error, waits for RUNNING state, and registers a shutdown hook.

Usage

Use this implementation when you need to correlate events from two separate Kafka topics based on a time window. This pattern is applicable to scenarios such as matching ride requests with pickup confirmations, joining order events with payment events, or any temporal event correlation.

Code Reference

Source Location

Signature

public class JsonKStreamJoins {
    private Properties props;

    public JsonKStreamJoins()
    public Topology createTopology()
    public void joinRidesPickupLocation() throws InterruptedException
    public static void main(String[] args) throws InterruptedException
}

Import

import org.example.JsonKStreamJoins;

I/O Contract

Inputs

Name Type Required Description
Topics.INPUT_RIDE_TOPIC Kafka topic Yes Input topic containing JSON-serialized Ride objects keyed by String.
Topics.INPUT_RIDE_LOCATION_TOPIC Kafka topic Yes Input topic containing JSON-serialized PickupLocation objects keyed by String.
Secrets.KAFKA_CLUSTER_KEY String Yes SASL username credential for Confluent Cloud Kafka cluster authentication.
Secrets.KAFKA_CLUSTER_SECRET String Yes SASL password credential for Confluent Cloud Kafka cluster authentication.

Outputs

Name Type Description
Topics.OUTPUT_TOPIC Kafka topic Output topic containing JSON-serialized VendorInfo objects representing successfully joined ride and pickup location events (within 10-minute proximity).
Console output stdout Prints the Kafka Streams application state and any uncaught exception messages.

Usage Examples

Basic Usage

// Instantiate and start the stream join application
var object = new JsonKStreamJoins();
object.joinRidesPickupLocation();

Join Logic Detail

// The join uses a 20-minute time window with 5-minute grace period
// Records are matched by key, then filtered by a 10-minute proximity check
var joined = rides.join(
    pickupLocationsKeyedOnPUId,
    (ride, pickupLocation) -> {
        var period = Duration.between(ride.tpep_dropoff_datetime, pickupLocation.tpep_pickup_datetime);
        if (period.abs().toMinutes() > 10) return Optional.empty();
        else return Optional.of(new VendorInfo(
            ride.VendorID,
            pickupLocation.PULocationID,
            pickupLocation.tpep_pickup_datetime,
            ride.tpep_dropoff_datetime));
    },
    JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(20), Duration.ofMinutes(5)),
    StreamJoined.with(Serdes.String(), CustomSerdes.getSerde(Ride.class), CustomSerdes.getSerde(PickupLocation.class))
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment