Implementation:DataTalksClub Data engineering zoomcamp Java AvroProducer
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Kafka |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
AvroProducer is a Java Kafka producer that reads taxi ride data from a CSV file, constructs Avro-serialized RideRecord objects, and publishes them to a Confluent Cloud Kafka topic using KafkaAvroSerializer with Schema Registry integration.
Description
The AvroProducer class demonstrates the Avro producer pattern for Apache Kafka using Confluent's Schema Registry. On construction, it configures a full set of producer properties including SASL/SSL authentication for Confluent Cloud, the KafkaAvroSerializer for value serialization, and Schema Registry URL and credentials for schema management.
The class provides two main methods:
- getRides() -- Reads a bundled CSV resource file (rides.csv), skips the header row, and maps each row into an Avro-generated RideRecord object (setting VendorId, TripDistance, and PassengerCount fields). Returns a List<RideRecord>.
- publishRides(List<RideRecord>) -- Creates a KafkaProducer<String, RideRecord> and iterates through the ride records, sending each to the "rides_avro" topic with the VendorId as the message key. Each send is followed by a 500ms sleep to throttle throughput. Errors are printed to stdout via a callback.
The main method ties these together: it instantiates the producer, loads the rides, and publishes them.
Usage
Use this implementation when you need to produce Avro-serialized messages to a Kafka topic backed by Confluent Schema Registry. This pattern is appropriate when you require schema evolution guarantees and compact binary serialization for your Kafka messages.
Code Reference
Source Location
- Repository: DataTalksClub_Data_engineering_zoomcamp
- File: 07-streaming/java/kafka_examples/src/main/java/org/example/AvroProducer.java
- Lines: 1-72
Signature
public class AvroProducer {
public AvroProducer()
public List<RideRecord> getRides() throws IOException, CsvException
public void publishRides(List<RideRecord> rides) throws ExecutionException, InterruptedException
public static void main(String[] args) throws IOException, CsvException, ExecutionException, InterruptedException
}
Import
import org.example.AvroProducer;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| rides.csv | CSV resource file | Yes | Bundled CSV file containing taxi ride records with columns: VendorID, pickup datetime, dropoff datetime, passenger_count, trip_distance, and additional fields. |
| Secrets.KAFKA_CLUSTER_KEY | String | Yes | SASL username credential for Confluent Cloud Kafka cluster authentication. |
| Secrets.KAFKA_CLUSTER_SECRET | String | Yes | SASL password credential for Confluent Cloud Kafka cluster authentication. |
| Secrets.SCHEMA_REGISTRY_KEY | String | Yes | API key for Confluent Schema Registry authentication. |
| Secrets.SCHEMA_REGISTRY_SECRET | String | Yes | API secret for Confluent Schema Registry authentication. |
Outputs
| Name | Type | Description |
|---|---|---|
| rides_avro topic | Kafka topic with Avro records | Avro-serialized RideRecord messages keyed by VendorId (String), containing VendorId, TripDistance, and PassengerCount. |
| Console output | stdout | Prints the offset of each successfully sent record. |
Usage Examples
Basic Usage
// Instantiate the Avro producer (configures Kafka and Schema Registry properties)
var producer = new AvroProducer();
// Read ride records from the bundled CSV resource
var rideRecords = producer.getRides();
// Publish all ride records to the "rides_avro" Kafka topic
producer.publishRides(rideRecords);
Building an Individual RideRecord
RideRecord record = RideRecord.newBuilder()
.setVendorId("1")
.setTripDistance(3.5)
.setPassengerCount(2)
.build();