Implementation:Apache Flink HadoopMapFunction
| Knowledge Sources | |
|---|---|
| Domains | Hadoop_Compatibility, DataSet_API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
HadoopMapFunction is a wrapper that adapts a Hadoop Mapper from the old mapred API to a Flink RichFlatMapFunction, enabling existing Hadoop map logic to be reused within Flink data processing pipelines.
Description
HadoopMapFunction is a public final class parameterized by four types: KEYIN, VALUEIN, KEYOUT, and VALUEOUT. It extends Flink's RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> and implements ResultTypeQueryable and Serializable.
The class wraps a Hadoop Mapper (from the org.apache.hadoop.mapred API) and executes it within Flink's processing framework. Key implementation details include:
- Constructors: Two constructors are provided. The first accepts only a Mapper instance and creates a default JobConf. The second accepts both a Mapper and a JobConf for custom Hadoop configuration. Both validate that neither argument is null.
- Lifecycle management: In the open method, the wrapped Hadoop Mapper is configured using the JobConf, a HadoopDummyReporter is created (since Flink does not use Hadoop's reporting mechanism), and a HadoopOutputCollector is initialized to bridge Hadoop's output collection with Flink's Collector.
- Map execution: The flatMap method receives a Tuple2<KEYIN, VALUEIN>, sets the Flink collector on the HadoopOutputCollector, and invokes the Hadoop Mapper's map method with the key (value.f0), value (value.f1), the output collector, and the reporter.
- Type resolution: The getProducedType method uses Flink's TypeExtractor to determine the output key and value types from the Mapper's generic type parameters (positions 2 and 3), constructing a TupleTypeInfo for the output type.
- Custom serialization: The class implements writeObject and readObject to handle Java serialization, writing the Mapper class reference and the JobConf data. On deserialization, the Mapper is re-instantiated from the class reference and the JobConf is reconstructed.
Usage
Use HadoopMapFunction when you want to reuse an existing Hadoop Mapper implementation within a Flink pipeline. This is useful for:
- Migrating Hadoop MapReduce jobs to Flink without rewriting the map logic.
- Leveraging existing Hadoop Mapper implementations for data transformation in Flink.
- Gradual migration scenarios where Hadoop components are incrementally replaced with native Flink operators.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
- Lines: 1-141
Signature
@Public
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable
Import
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| hadoopMapper | org.apache.hadoop.mapred.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> | Yes | The Hadoop Mapper instance to wrap |
| conf | org.apache.hadoop.mapred.JobConf | No | The Hadoop JobConf for configuring the Mapper; defaults to a new JobConf if not provided |
| input record | Tuple2<KEYIN, VALUEIN> | Yes (at runtime) | Input key-value pair received during flatMap execution |
Outputs
| Name | Type | Description |
|---|---|---|
| output records | Tuple2<KEYOUT, VALUEOUT> | Zero or more output key-value pairs produced by the Hadoop Mapper for each input record |
Usage Examples
// Wrap an existing Hadoop Mapper for use in Flink
Mapper<LongWritable, Text, Text, IntWritable> hadoopMapper = new TokenCountMapper();
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunction =
new HadoopMapFunction<>(hadoopMapper);
// Use with a Flink DataStream
DataStream<Tuple2<LongWritable, Text>> input = ...;
DataStream<Tuple2<Text, IntWritable>> mapped = input.flatMap(mapFunction);
// With custom JobConf
JobConf conf = new JobConf();
conf.set("mapred.mapper.custom.property", "value");
HadoopMapFunction<LongWritable, Text, Text, IntWritable> configuredMapFunction =
new HadoopMapFunction<>(hadoopMapper, conf);