Implementation:Heibaiying BigData Notes FlinkKafkaConsumer Configuration
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Big_Data |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for connecting Flink to Apache Kafka as a streaming data source provided by the Flink Kafka Connector library.
Description
The FlinkKafkaConsumer class is Flink's built-in Kafka source connector. It wraps the standard Kafka consumer client and integrates it into Flink's source function framework, providing automatic offset management through Flink's checkpointing mechanism and exactly-once processing guarantees. The consumer is configured with a Kafka topic name, a deserialization schema to convert raw Kafka messages into typed objects, and a Properties object containing Kafka consumer configuration.
In the BigData-Notes repository, the KafkaStreamingJob class demonstrates configuring a FlinkKafkaConsumer with SimpleStringSchema deserialization to read string messages from a Kafka topic.
Usage
Use FlinkKafkaConsumer whenever your Flink streaming application needs to ingest data from one or more Kafka topics. The consumer should be instantiated with the required configuration and then registered with the execution environment via env.addSource(). Ensure that bootstrap.servers and group.id are set in the properties, and select an appropriate deserialization schema for your message format.
Code Reference
Source Location
- Repository: BigData-Notes
- File:
code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java - Lines: 15-45
Signature
// Constructor
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
// Registration with environment
public <T> DataStreamSource<T> addSource(SourceFunction<T> function)
Import
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| topic | String | Yes | The name of the Kafka topic to consume from. |
| valueDeserializer | DeserializationSchema<T> | Yes | Schema that defines how to convert raw Kafka message bytes into typed Java objects. SimpleStringSchema converts bytes to String. |
| props | Properties | Yes | Kafka consumer configuration properties. Must include bootstrap.servers (Kafka broker addresses) and typically group.id (consumer group identifier). |
Outputs
| Name | Type | Description |
|---|---|---|
| stream | DataStream<String> | An unbounded stream of deserialized records from the configured Kafka topic. The element type matches the deserialization schema (String when using SimpleStringSchema). |
Usage Examples
Basic Usage
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
public class KafkaStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure Kafka consumer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-kafka-group");
// Create and register the Kafka source
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"flink-stream-in-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> stream = env.addSource(kafkaConsumer);
// Process the stream...
stream.print();
env.execute("Kafka Streaming Job");
}
}