Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink HadoopMapFunction

From Leeroopedia
Revision as of 14:17, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Flink_HadoopMapFunction.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Hadoop_Compatibility, DataSet_API
Last Updated 2026-02-09 00:00 GMT

Overview

HadoopMapFunction is a wrapper that adapts a Hadoop Mapper from the old mapred API to a Flink RichFlatMapFunction, enabling existing Hadoop map logic to be reused within Flink data processing pipelines.

Description

HadoopMapFunction is a public final class parameterized by four types: KEYIN, VALUEIN, KEYOUT, and VALUEOUT. It extends Flink's RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> and implements ResultTypeQueryable and Serializable.

The class wraps a Hadoop Mapper (from the org.apache.hadoop.mapred API) and executes it within Flink's processing framework. Key implementation details include:

  • Constructors: Two constructors are provided. The first accepts only a Mapper instance and creates a default JobConf. The second accepts both a Mapper and a JobConf for custom Hadoop configuration. Both validate that neither argument is null.
  • Lifecycle management: In the open method, the wrapped Hadoop Mapper is configured using the JobConf, a HadoopDummyReporter is created (since Flink does not use Hadoop's reporting mechanism), and a HadoopOutputCollector is initialized to bridge Hadoop's output collection with Flink's Collector.
  • Map execution: The flatMap method receives a Tuple2<KEYIN, VALUEIN>, sets the Flink collector on the HadoopOutputCollector, and invokes the Hadoop Mapper's map method with the key (value.f0), value (value.f1), the output collector, and the reporter.
  • Type resolution: The getProducedType method uses Flink's TypeExtractor to determine the output key and value types from the Mapper's generic type parameters (positions 2 and 3), constructing a TupleTypeInfo for the output type.
  • Custom serialization: The class implements writeObject and readObject to handle Java serialization, writing the Mapper class reference and the JobConf data. On deserialization, the Mapper is re-instantiated from the class reference and the JobConf is reconstructed.

Usage

Use HadoopMapFunction when you want to reuse an existing Hadoop Mapper implementation within a Flink pipeline. This is useful for:

  • Migrating Hadoop MapReduce jobs to Flink without rewriting the map logic.
  • Leveraging existing Hadoop Mapper implementations for data transformation in Flink.
  • Gradual migration scenarios where Hadoop components are incrementally replaced with native Flink operators.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
  • Lines: 1-141

Signature

@Public
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
        implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable

Import

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

I/O Contract

Inputs

Name Type Required Description
hadoopMapper org.apache.hadoop.mapred.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> Yes The Hadoop Mapper instance to wrap
conf org.apache.hadoop.mapred.JobConf No The Hadoop JobConf for configuring the Mapper; defaults to a new JobConf if not provided
input record Tuple2<KEYIN, VALUEIN> Yes (at runtime) Input key-value pair received during flatMap execution

Outputs

Name Type Description
output records Tuple2<KEYOUT, VALUEOUT> Zero or more output key-value pairs produced by the Hadoop Mapper for each input record

Usage Examples

// Wrap an existing Hadoop Mapper for use in Flink
Mapper<LongWritable, Text, Text, IntWritable> hadoopMapper = new TokenCountMapper();
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunction =
    new HadoopMapFunction<>(hadoopMapper);

// Use with a Flink DataStream
DataStream<Tuple2<LongWritable, Text>> input = ...;
DataStream<Tuple2<Text, IntWritable>> mapped = input.flatMap(mapFunction);

// With custom JobConf
JobConf conf = new JobConf();
conf.set("mapred.mapper.custom.property", "value");
HadoopMapFunction<LongWritable, Text, Text, IntWritable> configuredMapFunction =
    new HadoopMapFunction<>(hadoopMapper, conf);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment