Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:DataTalksClub Data engineering zoomcamp Java AvroProducer

From Leeroopedia


Knowledge Sources
Domains Streaming, Kafka
Last Updated 2026-02-09 00:00 GMT

Overview

AvroProducer is a Java Kafka producer that reads taxi ride data from a CSV file, constructs Avro-serialized RideRecord objects, and publishes them to a Confluent Cloud Kafka topic using KafkaAvroSerializer with Schema Registry integration.

Description

The AvroProducer class demonstrates the Avro producer pattern for Apache Kafka using Confluent's Schema Registry. On construction, it configures a full set of producer properties including SASL/SSL authentication for Confluent Cloud, the KafkaAvroSerializer for value serialization, and Schema Registry URL and credentials for schema management.

The class provides two main methods:

  • getRides() -- Reads a bundled CSV resource file (rides.csv), skips the header row, and maps each row into an Avro-generated RideRecord object (setting VendorId, TripDistance, and PassengerCount fields). Returns a List<RideRecord>.
  • publishRides(List<RideRecord>) -- Creates a KafkaProducer<String, RideRecord> and iterates through the ride records, sending each to the "rides_avro" topic with the VendorId as the message key. Each send is followed by a 500ms sleep to throttle throughput. Errors are printed to stdout via a callback.

The main method ties these together: it instantiates the producer, loads the rides, and publishes them.

Usage

Use this implementation when you need to produce Avro-serialized messages to a Kafka topic backed by Confluent Schema Registry. This pattern is appropriate when you require schema evolution guarantees and compact binary serialization for your Kafka messages.

Code Reference

Source Location

Signature

public class AvroProducer {
    public AvroProducer()
    public List<RideRecord> getRides() throws IOException, CsvException
    public void publishRides(List<RideRecord> rides) throws ExecutionException, InterruptedException
    public static void main(String[] args) throws IOException, CsvException, ExecutionException, InterruptedException
}

Import

import org.example.AvroProducer;

I/O Contract

Inputs

Name Type Required Description
rides.csv CSV resource file Yes Bundled CSV file containing taxi ride records with columns: VendorID, pickup datetime, dropoff datetime, passenger_count, trip_distance, and additional fields.
Secrets.KAFKA_CLUSTER_KEY String Yes SASL username credential for Confluent Cloud Kafka cluster authentication.
Secrets.KAFKA_CLUSTER_SECRET String Yes SASL password credential for Confluent Cloud Kafka cluster authentication.
Secrets.SCHEMA_REGISTRY_KEY String Yes API key for Confluent Schema Registry authentication.
Secrets.SCHEMA_REGISTRY_SECRET String Yes API secret for Confluent Schema Registry authentication.

Outputs

Name Type Description
rides_avro topic Kafka topic with Avro records Avro-serialized RideRecord messages keyed by VendorId (String), containing VendorId, TripDistance, and PassengerCount.
Console output stdout Prints the offset of each successfully sent record.

Usage Examples

Basic Usage

// Instantiate the Avro producer (configures Kafka and Schema Registry properties)
var producer = new AvroProducer();

// Read ride records from the bundled CSV resource
var rideRecords = producer.getRides();

// Publish all ride records to the "rides_avro" Kafka topic
producer.publishRides(rideRecords);

Building an Individual RideRecord

RideRecord record = RideRecord.newBuilder()
        .setVendorId("1")
        .setTripDistance(3.5)
        .setPassengerCount(2)
        .build();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment