Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink Mapreduce HadoopInputFormatBase

From Leeroopedia


Knowledge Sources
Domains Connectors, Hadoop_Compatibility
Last Updated 2026-02-09 00:00 GMT

Overview

HadoopInputFormatBase (mapreduce) is an abstract base class that wraps a Hadoop new-API mapreduce.InputFormat as a Flink input format, bridging Hadoop's thread-unsafe InputFormat API with Flink's multi-threaded execution model.

Description

This class resides in the org.apache.flink.api.java.hadoop.mapreduce package and provides the core internal base for the Hadoop mapreduce (new API) input format compatibility layer. It extends HadoopInputFormatCommonBase and is shared between the Java and Scala Flink APIs.

The key design considerations include:

Thread Safety via Static Mutexes: Like its mapred counterpart, this class uses three static mutex objects (OPEN_MUTEX, CONFIGURE_MUTEX, CLOSE_MUTEX) to serialize concurrent operations. This is necessary because Hadoop assumes JVM-level isolation while Flink parallelizes using threads within a single JVM.

Configuration: The configure() method checks whether the underlying Hadoop InputFormat implements Configurable and sets the Hadoop Configuration accordingly, within a synchronized block.

Split Creation: The createInputSplits() method sets the minimum split size in the Hadoop configuration, creates a JobContext with credentials from both stored credentials and the current UserGroupInformation, and delegates to the Hadoop InputFormat's getSplits() method. The resulting splits are wrapped into Flink HadoopInputSplit objects.

Record Reading: The open() method creates a TaskAttemptContext, obtains a RecordReader from the InputFormat via createRecordReader(), and initializes it with the split. The reachedEnd() and fetchNext() methods implement lazy fetching using recordReader.nextKeyValue(). The InterruptedException from the new Hadoop API is caught and re-thrown as IOException.

Custom Serialization: The class implements custom Java serialization via writeObject() and readObject(). It serializes the InputFormat class name, key/value class names, and the Hadoop Configuration. During deserialization, it reconstructs the InputFormat by reflection. Unlike the mapred version, credential merging is done during split creation rather than deserialization.

Statistics: For FileInputFormat instances, the class computes file-based statistics by enumerating files, checking modification times, and calculating total file sizes, with caching support via FileBaseStatistics.

Usage

This is an internal abstract class annotated with @Internal and is not intended for direct use by end users. It serves as the base class for concrete Hadoop mapreduce (new API) input format wrappers in both the Java and Scala Flink APIs. Subclasses must implement the nextRecord() method to extract the current key and value from the RecordReader and convert them into Flink records of type T.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
  • Lines: 1-346

Signature

@Internal
public abstract class HadoopInputFormatBase<K, V, T>
        extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {

    public HadoopInputFormatBase(
            org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
            Class<K> key,
            Class<V> value,
            Job job)

    public org.apache.hadoop.conf.Configuration getConfiguration()
    public void configure(Configuration parameters)
    public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException
    public HadoopInputSplit[] createInputSplits(int minNumSplits) throws IOException
    public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits)
    public void open(HadoopInputSplit split) throws IOException
    public boolean reachedEnd() throws IOException
    protected void fetchNext() throws IOException
    public void close() throws IOException
}

Import

import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase;

I/O Contract

Inputs

Name Type Required Description
mapreduceInputFormat org.apache.hadoop.mapreduce.InputFormat<K, V> Yes The Hadoop new-API InputFormat to wrap
key Class<K> Yes The class of the Hadoop key type
value Class<V> Yes The class of the Hadoop value type
job org.apache.hadoop.mapreduce.Job Yes The Hadoop Job containing configuration for the InputFormat; must not be null
split HadoopInputSplit Yes (for open) The input split to read records from
minNumSplits int Yes (for createInputSplits) Minimum number of input splits; sets mapreduce.input.fileinputformat.split.minsize

Outputs

Name Type Description
HadoopInputSplit[] HadoopInputSplit[] Array of Flink-wrapped Hadoop input splits
BaseStatistics BaseStatistics File statistics including total size and modification time (for FileInputFormat only)
recordReader RecordReader<K, V> The Hadoop RecordReader accessible by subclasses for extracting current key and value

Usage Examples

Basic Usage

// Subclass example: wrapping a new-API Hadoop TextInputFormat
public class MyNewApiHadoopInputFormat
        extends HadoopInputFormatBase<LongWritable, Text, Tuple2<LongWritable, Text>> {

    public MyNewApiHadoopInputFormat(
            org.apache.hadoop.mapreduce.InputFormat<LongWritable, Text> inputFormat,
            Class<LongWritable> keyClass,
            Class<Text> valueClass,
            Job job) {
        super(inputFormat, keyClass, valueClass, job);
    }

    @Override
    public Tuple2<LongWritable, Text> nextRecord(
            Tuple2<LongWritable, Text> reuse) throws IOException {
        if (!this.fetched) {
            fetchNext();
        }
        if (!this.hasNext) {
            return null;
        }
        this.fetched = false;
        return new Tuple2<>(
                recordReader.getCurrentKey(),
                recordReader.getCurrentValue());
    }
}

// Usage
Job job = Job.getInstance();
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(
        job, new org.apache.hadoop.fs.Path("/input/path"));
MyNewApiHadoopInputFormat format = new MyNewApiHadoopInputFormat(
        new org.apache.hadoop.mapreduce.lib.input.TextInputFormat(),
        LongWritable.class, Text.class, job);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment