Implementation:Apache Flink HadoopReducerWrappedFunction
| Knowledge Sources | |
|---|---|
| Domains | Hadoop_Compatibility, DataSet_API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
HadoopReducerWrappedFunction is a wrapper that adapts a Hadoop Reducer from the old mapred API to a Flink window function, enabling existing Hadoop reduce logic to be executed within both keyed and non-keyed Flink streaming pipelines.
Description
HadoopReducerWrappedFunction is a public final class parameterized by four types: KEYIN, VALUEIN, KEYOUT, and VALUEOUT. It extends Flink's RichWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, KEYIN, GlobalWindow> and additionally implements AllWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, GlobalWindow>, ResultTypeQueryable, and Serializable.
By implementing both RichWindowFunction (for keyed streams) and AllWindowFunction (for non-keyed streams), this wrapper can be used in both keyed and non-keyed window operations with GlobalWindow.
Key implementation details include:
- Constructors: Two constructors are provided. The first accepts only a Reducer instance and creates a default JobConf. The second accepts both a Reducer and a JobConf for custom Hadoop configuration. Both validate that neither argument is null.
- Lifecycle management: In the open method, the Hadoop Reducer is configured with the JobConf, a HadoopDummyReporter is created, a HadoopOutputCollector is initialized, and a HadoopTupleUnwrappingIterator is created with a key serializer derived from the Reducer's input key type. The key serializer is obtained from the Flink runtime context.
- Window apply (keyed): The apply(KEYIN, GlobalWindow, Iterable<Tuple2<KEYIN, VALUEIN>>, Collector) method sets up the output collector and value iterator, then delegates to the Hadoop Reducer's reduce method with the current key, the unwrapping iterator, the output collector, and the reporter.
- Window apply (non-keyed): The apply(GlobalWindow, Iterable<Tuple2<KEYIN, VALUEIN>>, Collector) method provides the same behavior for non-keyed (all-window) scenarios, using the key extracted from the iterator.
- Type resolution: The getProducedType method uses Flink's TypeExtractor to determine the output key and value types from the Reducer's generic type parameters (positions 2 and 3).
- Custom serialization: The class implements writeObject and readObject for Java serialization, persisting the Reducer class and JobConf data, and reconstructing them on deserialization.
Usage
Use HadoopReducerWrappedFunction when you want to reuse an existing Hadoop Reducer implementation within a Flink streaming pipeline using window operations. This is useful for:
- Migrating Hadoop MapReduce jobs to Flink's streaming API without rewriting the reduce logic.
- Applying Hadoop Reducer logic within Flink's windowing framework on both keyed and non-keyed streams.
- Gradual migration scenarios where Hadoop reduce components are incrementally replaced with native Flink operators.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReducerWrappedFunction.java
- Lines: 1-178
Signature
@Public
public final class HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichWindowFunction<
Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, KEYIN, GlobalWindow>
implements AllWindowFunction<
Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, GlobalWindow>,
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>,
Serializable
Import
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| hadoopReducer | org.apache.hadoop.mapred.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> | Yes | The Hadoop Reducer instance to wrap |
| conf | org.apache.hadoop.mapred.JobConf | No | The Hadoop JobConf for configuring the Reducer; defaults to a new JobConf if not provided |
| window elements | Iterable<Tuple2<KEYIN, VALUEIN>> | Yes (at runtime) | The collection of input key-value pairs within the window, provided during apply execution |
Outputs
| Name | Type | Description |
|---|---|---|
| output records | Tuple2<KEYOUT, VALUEOUT> | Zero or more output key-value pairs produced by the Hadoop Reducer for each window of input records |
Usage Examples
// Wrap an existing Hadoop Reducer for use in a Flink keyed window
Reducer<Text, IntWritable, Text, IntWritable> hadoopReducer = new SumReducer();
HadoopReducerWrappedFunction<Text, IntWritable, Text, IntWritable> reduceFunction =
new HadoopReducerWrappedFunction<>(hadoopReducer);
// Use with a keyed stream and global window
DataStream<Tuple2<Text, IntWritable>> input = ...;
DataStream<Tuple2<Text, IntWritable>> reduced = input
.keyBy(tuple -> tuple.f0)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(10))
.apply(reduceFunction);
// With custom JobConf
JobConf conf = new JobConf();
conf.set("mapred.reducer.custom.property", "value");
HadoopReducerWrappedFunction<Text, IntWritable, Text, IntWritable> configuredReduceFunction =
new HadoopReducerWrappedFunction<>(hadoopReducer, conf);