Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink HadoopReducerWrappedFunction

From Leeroopedia


Knowledge Sources
Domains Hadoop_Compatibility, DataSet_API
Last Updated 2026-02-09 00:00 GMT

Overview

HadoopReducerWrappedFunction is a wrapper that adapts a Hadoop Reducer from the old mapred API to a Flink window function, enabling existing Hadoop reduce logic to be executed within both keyed and non-keyed Flink streaming pipelines.

Description

HadoopReducerWrappedFunction is a public final class parameterized by four types: KEYIN, VALUEIN, KEYOUT, and VALUEOUT. It extends Flink's RichWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, KEYIN, GlobalWindow> and additionally implements AllWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, GlobalWindow>, ResultTypeQueryable, and Serializable.

By implementing both RichWindowFunction (for keyed streams) and AllWindowFunction (for non-keyed streams), this wrapper can be used in both keyed and non-keyed window operations with GlobalWindow.

Key implementation details include:

  • Constructors: Two constructors are provided. The first accepts only a Reducer instance and creates a default JobConf. The second accepts both a Reducer and a JobConf for custom Hadoop configuration. Both validate that neither argument is null.
  • Lifecycle management: In the open method, the Hadoop Reducer is configured with the JobConf, a HadoopDummyReporter is created, a HadoopOutputCollector is initialized, and a HadoopTupleUnwrappingIterator is created with a key serializer derived from the Reducer's input key type. The key serializer is obtained from the Flink runtime context.
  • Window apply (keyed): The apply(KEYIN, GlobalWindow, Iterable<Tuple2<KEYIN, VALUEIN>>, Collector) method sets up the output collector and value iterator, then delegates to the Hadoop Reducer's reduce method with the current key, the unwrapping iterator, the output collector, and the reporter.
  • Window apply (non-keyed): The apply(GlobalWindow, Iterable<Tuple2<KEYIN, VALUEIN>>, Collector) method provides the same behavior for non-keyed (all-window) scenarios, using the key extracted from the iterator.
  • Type resolution: The getProducedType method uses Flink's TypeExtractor to determine the output key and value types from the Reducer's generic type parameters (positions 2 and 3).
  • Custom serialization: The class implements writeObject and readObject for Java serialization, persisting the Reducer class and JobConf data, and reconstructing them on deserialization.

Usage

Use HadoopReducerWrappedFunction when you want to reuse an existing Hadoop Reducer implementation within a Flink streaming pipeline using window operations. This is useful for:

  • Migrating Hadoop MapReduce jobs to Flink's streaming API without rewriting the reduce logic.
  • Applying Hadoop Reducer logic within Flink's windowing framework on both keyed and non-keyed streams.
  • Gradual migration scenarios where Hadoop reduce components are incrementally replaced with native Flink operators.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReducerWrappedFunction.java
  • Lines: 1-178

Signature

@Public
public final class HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        extends RichWindowFunction<
                Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, KEYIN, GlobalWindow>
        implements AllWindowFunction<
                        Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, GlobalWindow>,
                ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>,
                Serializable

Import

import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;

I/O Contract

Inputs

Name Type Required Description
hadoopReducer org.apache.hadoop.mapred.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> Yes The Hadoop Reducer instance to wrap
conf org.apache.hadoop.mapred.JobConf No The Hadoop JobConf for configuring the Reducer; defaults to a new JobConf if not provided
window elements Iterable<Tuple2<KEYIN, VALUEIN>> Yes (at runtime) The collection of input key-value pairs within the window, provided during apply execution

Outputs

Name Type Description
output records Tuple2<KEYOUT, VALUEOUT> Zero or more output key-value pairs produced by the Hadoop Reducer for each window of input records

Usage Examples

// Wrap an existing Hadoop Reducer for use in a Flink keyed window
Reducer<Text, IntWritable, Text, IntWritable> hadoopReducer = new SumReducer();
HadoopReducerWrappedFunction<Text, IntWritable, Text, IntWritable> reduceFunction =
    new HadoopReducerWrappedFunction<>(hadoopReducer);

// Use with a keyed stream and global window
DataStream<Tuple2<Text, IntWritable>> input = ...;
DataStream<Tuple2<Text, IntWritable>> reduced = input
    .keyBy(tuple -> tuple.f0)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(10))
    .apply(reduceFunction);

// With custom JobConf
JobConf conf = new JobConf();
conf.set("mapred.reducer.custom.property", "value");
HadoopReducerWrappedFunction<Text, IntWritable, Text, IntWritable> configuredReduceFunction =
    new HadoopReducerWrappedFunction<>(hadoopReducer, conf);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment