Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes FlinkKafkaConsumer Configuration

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Big_Data
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for connecting Flink to Apache Kafka as a streaming data source provided by the Flink Kafka Connector library.

Description

The FlinkKafkaConsumer class is Flink's built-in Kafka source connector. It wraps the standard Kafka consumer client and integrates it into Flink's source function framework, providing automatic offset management through Flink's checkpointing mechanism and exactly-once processing guarantees. The consumer is configured with a Kafka topic name, a deserialization schema to convert raw Kafka messages into typed objects, and a Properties object containing Kafka consumer configuration.

In the BigData-Notes repository, the KafkaStreamingJob class demonstrates configuring a FlinkKafkaConsumer with SimpleStringSchema deserialization to read string messages from a Kafka topic.

Usage

Use FlinkKafkaConsumer whenever your Flink streaming application needs to ingest data from one or more Kafka topics. The consumer should be instantiated with the required configuration and then registered with the execution environment via env.addSource(). Ensure that bootstrap.servers and group.id are set in the properties, and select an appropriate deserialization schema for your message format.

Code Reference

Source Location

  • Repository: BigData-Notes
  • File: code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java
  • Lines: 15-45

Signature

// Constructor
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props)

// Registration with environment
public <T> DataStreamSource<T> addSource(SourceFunction<T> function)

Import

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

I/O Contract

Inputs

Name Type Required Description
topic String Yes The name of the Kafka topic to consume from.
valueDeserializer DeserializationSchema<T> Yes Schema that defines how to convert raw Kafka message bytes into typed Java objects. SimpleStringSchema converts bytes to String.
props Properties Yes Kafka consumer configuration properties. Must include bootstrap.servers (Kafka broker addresses) and typically group.id (consumer group identifier).

Outputs

Name Type Description
stream DataStream<String> An unbounded stream of deserialized records from the configured Kafka topic. The element type matches the deserialization schema (String when using SimpleStringSchema).

Usage Examples

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

public class KafkaStreamingJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Configure Kafka consumer properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-kafka-group");

        // Create and register the Kafka source
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
            "flink-stream-in-topic",
            new SimpleStringSchema(),
            properties
        );

        DataStream<String> stream = env.addSource(kafkaConsumer);

        // Process the stream...
        stream.print();

        env.execute("Kafka Streaming Job");
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment