Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink Mapred HadoopInputFormatBase

From Leeroopedia


Knowledge Sources
Domains Connectors, Hadoop_Compatibility
Last Updated 2026-02-09 00:00 GMT

Overview

HadoopInputFormatBase (mapred) is an abstract base class that wraps a Hadoop old-API mapred.InputFormat as a Flink input format, bridging Hadoop's thread-unsafe InputFormat API with Flink's multi-threaded execution model.

Description

This class resides in the org.apache.flink.api.java.hadoop.mapred package and provides the core internal base for the Hadoop mapred (old API) input format compatibility layer. It extends HadoopInputFormatCommonBase and is shared between the Java and Scala Flink APIs.

The key design considerations include:

Thread Safety via Static Mutexes: Hadoop parallelizes tasks across JVMs and assumes JVM-level isolation. In contrast, Flink parallelizes using threads, so multiple Hadoop InputFormat instances may be used within the same JVM. To address this, the class uses three static mutex objects (OPEN_MUTEX, CONFIGURE_MUTEX, CLOSE_MUTEX) to serialize concurrent operations on the Hadoop InputFormat.

Configuration: The configure() method checks whether the underlying Hadoop InputFormat implements Configurable or JobConfigurable and applies the JobConf accordingly, all within a synchronized block.

Split Creation: The createInputSplits() method delegates to the Hadoop InputFormat's getSplits() method and wraps the resulting Hadoop splits into Flink HadoopInputSplit objects.

Record Reading: The open() method obtains a Hadoop RecordReader from the InputFormat, creates initial key and value instances, and sets up reading state. The reachedEnd() and fetchNext() methods implement lazy fetching of the next record.

Custom Serialization: The class implements custom Java serialization via writeObject() and readObject() to serialize the Hadoop InputFormat class name, key/value class names, and the JobConf. During deserialization, it reconstructs the InputFormat by reflection, merges credentials from the current UserGroupInformation, and reconfigures the InputFormat.

Statistics: For FileInputFormat instances, the class computes file-based statistics by enumerating files, checking modification times, and calculating total file sizes with caching support.

Usage

This is an internal abstract class annotated with @Internal and is not intended for direct use by end users. It serves as the base class for concrete Hadoop mapred input format wrappers in both the Java and Scala Flink APIs. Subclasses must implement the nextRecord() method to convert Hadoop key-value pairs into Flink records of type T.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
  • Lines: 1-329

Signature

@Internal
public abstract class HadoopInputFormatBase<K, V, T>
        extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {

    public HadoopInputFormatBase(
            org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
            Class<K> key,
            Class<V> value,
            JobConf job)

    public JobConf getJobConf()
    public void configure(Configuration parameters)
    public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException
    public HadoopInputSplit[] createInputSplits(int minNumSplits) throws IOException
    public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits)
    public void open(HadoopInputSplit split) throws IOException
    public boolean reachedEnd() throws IOException
    protected void fetchNext() throws IOException
    public void close() throws IOException
}

Import

import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase;

I/O Contract

Inputs

Name Type Required Description
mapredInputFormat org.apache.hadoop.mapred.InputFormat<K, V> Yes The Hadoop old-API InputFormat to wrap
key Class<K> Yes The class of the Hadoop key type
value Class<V> Yes The class of the Hadoop value type
job JobConf Yes The Hadoop JobConf containing configuration for the InputFormat
split HadoopInputSplit Yes (for open) The input split to read records from
minNumSplits int Yes (for createInputSplits) Minimum number of input splits to create

Outputs

Name Type Description
HadoopInputSplit[] HadoopInputSplit[] Array of Flink-wrapped Hadoop input splits
BaseStatistics BaseStatistics File statistics including total size and modification time (for FileInputFormat only)
key K The current Hadoop key (accessible by subclasses via protected field)
value V The current Hadoop value (accessible by subclasses via protected field)

Usage Examples

Basic Usage

// Subclass example: wrapping a Hadoop TextInputFormat
public class MyHadoopInputFormat
        extends HadoopInputFormatBase<LongWritable, Text, Tuple2<LongWritable, Text>> {

    public MyHadoopInputFormat(
            org.apache.hadoop.mapred.InputFormat<LongWritable, Text> inputFormat,
            Class<LongWritable> keyClass,
            Class<Text> valueClass,
            JobConf job) {
        super(inputFormat, keyClass, valueClass, job);
    }

    @Override
    public Tuple2<LongWritable, Text> nextRecord(
            Tuple2<LongWritable, Text> reuse) throws IOException {
        if (!this.fetched) {
            fetchNext();
        }
        if (!this.hasNext) {
            return null;
        }
        this.fetched = false;
        return new Tuple2<>(key, value);
    }
}

// Usage
JobConf jobConf = new JobConf();
FileInputFormat.addInputPath(jobConf, new Path("/input/path"));
MyHadoopInputFormat format = new MyHadoopInputFormat(
        new TextInputFormat(), LongWritable.class, Text.class, jobConf);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment