Implementation:DataTalksClub Data engineering zoomcamp Java CustomSerdes
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
CustomSerdes is a factory class that provides generic methods for creating JSON and Avro Serde instances for use with Kafka Streams, abstracting away the boilerplate configuration of Confluent's serializers and deserializers.
Description
The CustomSerdes class in the org.example.customserdes package provides two static factory methods for creating Kafka Streams Serde objects:
- getSerde(Class<T> classOf) -- Creates a generic JSON Serde<T> for any Java type. It:
- Configures a KafkaJsonSerializer<T> and KafkaJsonDeserializer<T> with the "json.value.type" property set to the provided class.
- Wraps them using Serdes.serdeFrom(mySerializer, myDeserializer).
- Returns the composed Serde<T> that can be used in Kafka Streams Consumed, Produced, and StreamJoined configurations.
- getAvroSerde(boolean isKey, String schemaRegistryUrl) -- Creates a SpecificAvroSerde<T> for Avro-generated types extending SpecificRecordBase. It:
- Creates a new SpecificAvroSerde<T> instance.
- Configures it with the SCHEMA_REGISTRY_URL_CONFIG property.
- Calls serde.configure(serdeProps, isKey) to specify whether the serde is for keys or values.
- Returns the configured SpecificAvroSerde.
This class is used throughout the Kafka Streams examples (JsonKStream, JsonKStreamJoins, JsonKStreamWindow) to provide the custom serialization needed for Ride, PickupLocation, and VendorInfo data types.
Usage
Use this implementation whenever you need to create JSON or Avro Serde instances for Kafka Streams topologies. This factory pattern eliminates repetitive serializer/deserializer configuration and provides a single point of change for serialization settings.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/java/kafka_examples/src/main/java/org/example/customserdes/CustomSerdes.java
- Lines: 1-42
Signature
public class CustomSerdes {
public static <T> Serde<T> getSerde(Class<T> classOf)
public static <T extends SpecificRecordBase> SpecificAvroSerde getAvroSerde(boolean isKey, String schemaRegistryUrl)
}
Import
import org.example.customserdes.CustomSerdes;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| classOf | Class<T> | Yes (for getSerde) | The target Java class type for JSON serialization/deserialization. |
| isKey | boolean | Yes (for getAvroSerde) | Whether the serde is being used for the key (true) or value (false). |
| schemaRegistryUrl | String | Yes (for getAvroSerde) | The URL of the Confluent Schema Registry instance. |
Outputs
| Name | Type | Description |
|---|---|---|
| Serde<T> | org.apache.kafka.common.serialization.Serde<T> | A fully configured JSON Serde capable of serializing and deserializing the specified type. |
| SpecificAvroSerde | io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde | A fully configured Avro Serde for Avro-generated types, connected to Schema Registry. |
Usage Examples
JSON Serde for Ride Objects
// Create a JSON Serde for the Ride class
Serde<Ride> rideSerde = CustomSerdes.getSerde(Ride.class);
// Use in a Kafka Streams topology
var ridesStream = streamsBuilder.stream("rides",
Consumed.with(Serdes.String(), CustomSerdes.getSerde(Ride.class)));
Avro Serde with Schema Registry
// Create an Avro Serde for value serialization
SpecificAvroSerde avroSerde = CustomSerdes.getAvroSerde(
false, // isKey = false (this is a value serde)
"https://psrc-kk5gg.europe-west3.gcp.confluent.cloud"
);
Multiple Serdes in a Join
// Use custom serdes for a stream-stream join
StreamJoined.with(
Serdes.String(),
CustomSerdes.getSerde(Ride.class),
CustomSerdes.getSerde(PickupLocation.class)
);