Implementation:Apache Flink Mapreduce HadoopInputFormatBase
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Hadoop_Compatibility |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
HadoopInputFormatBase (mapreduce) is an abstract base class that wraps a Hadoop new-API mapreduce.InputFormat as a Flink input format, bridging Hadoop's thread-unsafe InputFormat API with Flink's multi-threaded execution model.
Description
This class resides in the org.apache.flink.api.java.hadoop.mapreduce package and provides the core internal base for the Hadoop mapreduce (new API) input format compatibility layer. It extends HadoopInputFormatCommonBase and is shared between the Java and Scala Flink APIs.
The key design considerations include:
Thread Safety via Static Mutexes: Like its mapred counterpart, this class uses three static mutex objects (OPEN_MUTEX, CONFIGURE_MUTEX, CLOSE_MUTEX) to serialize concurrent operations. This is necessary because Hadoop assumes JVM-level isolation while Flink parallelizes using threads within a single JVM.
Configuration: The configure() method checks whether the underlying Hadoop InputFormat implements Configurable and sets the Hadoop Configuration accordingly, within a synchronized block.
Split Creation: The createInputSplits() method sets the minimum split size in the Hadoop configuration, creates a JobContext with credentials from both stored credentials and the current UserGroupInformation, and delegates to the Hadoop InputFormat's getSplits() method. The resulting splits are wrapped into Flink HadoopInputSplit objects.
Record Reading: The open() method creates a TaskAttemptContext, obtains a RecordReader from the InputFormat via createRecordReader(), and initializes it with the split. The reachedEnd() and fetchNext() methods implement lazy fetching using recordReader.nextKeyValue(). The InterruptedException from the new Hadoop API is caught and re-thrown as IOException.
Custom Serialization: The class implements custom Java serialization via writeObject() and readObject(). It serializes the InputFormat class name, key/value class names, and the Hadoop Configuration. During deserialization, it reconstructs the InputFormat by reflection. Unlike the mapred version, credential merging is done during split creation rather than deserialization.
Statistics: For FileInputFormat instances, the class computes file-based statistics by enumerating files, checking modification times, and calculating total file sizes, with caching support via FileBaseStatistics.
Usage
This is an internal abstract class annotated with @Internal and is not intended for direct use by end users. It serves as the base class for concrete Hadoop mapreduce (new API) input format wrappers in both the Java and Scala Flink APIs. Subclasses must implement the nextRecord() method to extract the current key and value from the RecordReader and convert them into Flink records of type T.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
- Lines: 1-346
Signature
@Internal
public abstract class HadoopInputFormatBase<K, V, T>
extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {
public HadoopInputFormatBase(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
Job job)
public org.apache.hadoop.conf.Configuration getConfiguration()
public void configure(Configuration parameters)
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException
public HadoopInputSplit[] createInputSplits(int minNumSplits) throws IOException
public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits)
public void open(HadoopInputSplit split) throws IOException
public boolean reachedEnd() throws IOException
protected void fetchNext() throws IOException
public void close() throws IOException
}
Import
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mapreduceInputFormat | org.apache.hadoop.mapreduce.InputFormat<K, V> | Yes | The Hadoop new-API InputFormat to wrap |
| key | Class<K> | Yes | The class of the Hadoop key type |
| value | Class<V> | Yes | The class of the Hadoop value type |
| job | org.apache.hadoop.mapreduce.Job | Yes | The Hadoop Job containing configuration for the InputFormat; must not be null |
| split | HadoopInputSplit | Yes (for open) | The input split to read records from |
| minNumSplits | int | Yes (for createInputSplits) | Minimum number of input splits; sets mapreduce.input.fileinputformat.split.minsize |
Outputs
| Name | Type | Description |
|---|---|---|
| HadoopInputSplit[] | HadoopInputSplit[] | Array of Flink-wrapped Hadoop input splits |
| BaseStatistics | BaseStatistics | File statistics including total size and modification time (for FileInputFormat only) |
| recordReader | RecordReader<K, V> | The Hadoop RecordReader accessible by subclasses for extracting current key and value |
Usage Examples
Basic Usage
// Subclass example: wrapping a new-API Hadoop TextInputFormat
public class MyNewApiHadoopInputFormat
extends HadoopInputFormatBase<LongWritable, Text, Tuple2<LongWritable, Text>> {
public MyNewApiHadoopInputFormat(
org.apache.hadoop.mapreduce.InputFormat<LongWritable, Text> inputFormat,
Class<LongWritable> keyClass,
Class<Text> valueClass,
Job job) {
super(inputFormat, keyClass, valueClass, job);
}
@Override
public Tuple2<LongWritable, Text> nextRecord(
Tuple2<LongWritable, Text> reuse) throws IOException {
if (!this.fetched) {
fetchNext();
}
if (!this.hasNext) {
return null;
}
this.fetched = false;
return new Tuple2<>(
recordReader.getCurrentKey(),
recordReader.getCurrentValue());
}
}
// Usage
Job job = Job.getInstance();
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(
job, new org.apache.hadoop.fs.Path("/input/path"));
MyNewApiHadoopInputFormat format = new MyNewApiHadoopInputFormat(
new org.apache.hadoop.mapreduce.lib.input.TextInputFormat(),
LongWritable.class, Text.class, job);