Implementation:Apache Flink Mapreduce HadoopInputFormat
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Hadoop_Compatibility |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A Flink wrapper that adapts Hadoop's new mapreduce InputFormat API so that it can be used as a Flink input format, producing key-value pairs as Tuple2<K, V> records.
Description
HadoopInputFormat is a concrete implementation in the org.apache.flink.api.java.hadoop.mapreduce package that extends HadoopInputFormatBase<K, V, Tuple2<K, V>> and implements ResultTypeQueryable<Tuple2<K, V>>. It serves as the primary user-facing adapter for the new-style Hadoop mapreduce (YARN-compatible) InputFormat API within Apache Flink.
The class provides two constructors: one accepting a Hadoop InputFormat<K, V>, key class, value class, and a Job instance; and another that creates a default Job instance automatically (which may throw IOException). The nextRecord method reads the current key and value from the Hadoop RecordReader (via getCurrentKey() and getCurrentValue()) and populates the provided Tuple2 reuse object. The getProducedType method returns the TypeInformation for the output tuple.
Unlike the mapred variant which accesses key/value fields directly, this mapreduce version calls recordReader.getCurrentKey() and recordReader.getCurrentValue(), and wraps InterruptedException in IOException since the new Hadoop API methods declare this checked exception.
This class is annotated with @Public, indicating it is part of Flink's stable public API.
Usage
Use this class when you need to read data from Hadoop-compatible data sources using the new org.apache.hadoop.mapreduce.InputFormat API within a Flink batch program. This is the preferred adapter when working with newer Hadoop InputFormat implementations that use the mapreduce package rather than the legacy mapred package.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
- Lines: 1-85
Signature
@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
implements ResultTypeQueryable<Tuple2<K, V>>
Import
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mapreduceInputFormat | org.apache.hadoop.mapreduce.InputFormat<K, V> | Yes | The Hadoop mapreduce InputFormat instance to wrap |
| key | Class<K> | Yes | The class of the key type |
| value | Class<V> | Yes | The class of the value type |
| job | org.apache.hadoop.mapreduce.Job | No | The Hadoop Job for configuration; a default Job is created via Job.getInstance() if not provided |
Outputs
| Name | Type | Description |
|---|---|---|
| record | Tuple2<K, V> | A tuple containing the key (f0) and value (f1) read from the Hadoop InputFormat via RecordReader |
Usage Examples
// Create a HadoopInputFormat wrapping a Hadoop mapreduce TextInputFormat
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
Job job = Job.getInstance();
TextInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path("/path/to/input"));
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<>(new TextInputFormat(), LongWritable.class, Text.class, job);
// Use hadoopIF as a Flink InputFormat in an ExecutionEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> data = env.createInput(hadoopIF);