Implementation:Apache Flink Mapred HadoopInputFormat
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Hadoop_Compatibility |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A Flink wrapper that adapts Hadoop's legacy mapred InputFormat API so that it can be used as a Flink input format, producing key-value pairs as Tuple2<K, V> records.
Description
HadoopInputFormat is a concrete implementation in the org.apache.flink.api.java.hadoop.mapred package that extends HadoopInputFormatBase<K, V, Tuple2<K, V>> and implements ResultTypeQueryable<Tuple2<K, V>>. It serves as the primary user-facing adapter for the old-style Hadoop mapred (pre-YARN) InputFormat API within Apache Flink.
The class provides two constructors: one accepting a Hadoop InputFormat<K, V>, key class, value class, and a JobConf; and another that creates a default JobConf automatically. The nextRecord method reads the next key-value pair from the underlying Hadoop RecordReader and populates the provided Tuple2 reuse object. The getProducedType method returns the TypeInformation for the output tuple, enabling Flink's type system to correctly handle serialization and deserialization.
This class is annotated with @Public, indicating it is part of Flink's stable public API.
Usage
Use this class when you need to read data from Hadoop-compatible data sources using the legacy org.apache.hadoop.mapred.InputFormat API within a Flink batch program. This is particularly useful for reusing existing Hadoop InputFormat implementations (such as those for reading from HDFS, HBase, or other Hadoop-ecosystem storage systems) without rewriting them for Flink's native API.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
- Lines: 1-81
Signature
@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
implements ResultTypeQueryable<Tuple2<K, V>>
Import
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mapredInputFormat | org.apache.hadoop.mapred.InputFormat<K, V> | Yes | The Hadoop mapred InputFormat instance to wrap |
| key | Class<K> | Yes | The class of the key type |
| value | Class<V> | Yes | The class of the value type |
| job | org.apache.hadoop.mapred.JobConf | No | The Hadoop JobConf for configuration; a default JobConf is created if not provided |
Outputs
| Name | Type | Description |
|---|---|---|
| record | Tuple2<K, V> | A tuple containing the key (f0) and value (f1) read from the Hadoop InputFormat |
Usage Examples
// Create a HadoopInputFormat wrapping a Hadoop TextInputFormat
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.JobConf;
JobConf jobConf = new JobConf();
jobConf.set("mapred.input.dir", "/path/to/input");
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<>(new TextInputFormat(), LongWritable.class, Text.class, jobConf);
// Use hadoopIF as a Flink InputFormat in an ExecutionEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> data = env.createInput(hadoopIF);