Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Java CustomSerdes

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

CustomSerdes is a factory class that provides generic methods for creating JSON and Avro Serde instances for use with Kafka Streams, abstracting away the boilerplate configuration of Confluent's serializers and deserializers.

Description

The CustomSerdes class in the org.example.customserdes package provides two static factory methods for creating Kafka Streams Serde objects:

  • getSerde(Class<T> classOf) -- Creates a generic JSON Serde<T> for any Java type. It:
    1. Configures a KafkaJsonSerializer<T> and KafkaJsonDeserializer<T> with the "json.value.type" property set to the provided class.
    2. Wraps them using Serdes.serdeFrom(mySerializer, myDeserializer).
    3. Returns the composed Serde<T> that can be used in Kafka Streams Consumed, Produced, and StreamJoined configurations.
  • getAvroSerde(boolean isKey, String schemaRegistryUrl) -- Creates a SpecificAvroSerde<T> for Avro-generated types extending SpecificRecordBase. It:
    1. Creates a new SpecificAvroSerde<T> instance.
    2. Configures it with the SCHEMA_REGISTRY_URL_CONFIG property.
    3. Calls serde.configure(serdeProps, isKey) to specify whether the serde is for keys or values.
    4. Returns the configured SpecificAvroSerde.

This class is used throughout the Kafka Streams examples (JsonKStream, JsonKStreamJoins, JsonKStreamWindow) to provide the custom serialization needed for Ride, PickupLocation, and VendorInfo data types.

Usage

Use this implementation whenever you need to create JSON or Avro Serde instances for Kafka Streams topologies. This factory pattern eliminates repetitive serializer/deserializer configuration and provides a single point of change for serialization settings.

Code Reference

Source Location

Signature

public class CustomSerdes {
    public static <T> Serde<T> getSerde(Class<T> classOf)
    public static <T extends SpecificRecordBase> SpecificAvroSerde getAvroSerde(boolean isKey, String schemaRegistryUrl)
}

Import

import org.example.customserdes.CustomSerdes;

I/O Contract

Inputs

Name Type Required Description
classOf Class<T> Yes (for getSerde) The target Java class type for JSON serialization/deserialization.
isKey boolean Yes (for getAvroSerde) Whether the serde is being used for the key (true) or value (false).
schemaRegistryUrl String Yes (for getAvroSerde) The URL of the Confluent Schema Registry instance.

Outputs

Name Type Description
Serde<T> org.apache.kafka.common.serialization.Serde<T> A fully configured JSON Serde capable of serializing and deserializing the specified type.
SpecificAvroSerde io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde A fully configured Avro Serde for Avro-generated types, connected to Schema Registry.

Usage Examples

JSON Serde for Ride Objects

// Create a JSON Serde for the Ride class
Serde<Ride> rideSerde = CustomSerdes.getSerde(Ride.class);

// Use in a Kafka Streams topology
var ridesStream = streamsBuilder.stream("rides",
    Consumed.with(Serdes.String(), CustomSerdes.getSerde(Ride.class)));

Avro Serde with Schema Registry

// Create an Avro Serde for value serialization
SpecificAvroSerde avroSerde = CustomSerdes.getAvroSerde(
    false,  // isKey = false (this is a value serde)
    "https://psrc-kk5gg.europe-west3.gcp.confluent.cloud"
);

Multiple Serdes in a Join

// Use custom serdes for a stream-stream join
StreamJoined.with(
    Serdes.String(),
    CustomSerdes.getSerde(Ride.class),
    CustomSerdes.getSerde(PickupLocation.class)
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment