Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink HadoopInputs

From Leeroopedia


Knowledge Sources
Domains Hadoop_Compatibility, Connectors
Last Updated 2026-02-09 00:00 GMT

Overview

HadoopInputs is a utility class that provides static factory methods for creating Flink InputFormat wrappers around Hadoop InputFormats from both the old (mapred) and new (mapreduce) Hadoop APIs.

Description

HadoopInputs is a final utility class in the org.apache.flink.hadoopcompatibility package that enables Apache Flink to read data from Hadoop-compatible data sources. It provides a collection of static factory methods that wrap Hadoop InputFormats into Flink InputFormat instances.

The class supports both Hadoop APIs:

Old API (mapred):

  • readHadoopFile(FileInputFormat, Class, Class, String, JobConf): Wraps a Hadoop mapred FileInputFormat with a specified JobConf, setting the input path on the configuration.
  • readHadoopFile(FileInputFormat, Class, Class, String): Convenience overload that creates a default JobConf.
  • readSequenceFile(Class, Class, String): Specialized convenience method that reads Hadoop SequenceFiles using SequenceFileInputFormat.
  • createHadoopInput(InputFormat, Class, Class, JobConf): Wraps any Hadoop mapred InputFormat (not just file-based) with a given JobConf.

New API (mapreduce):

  • readHadoopFile(FileInputFormat, Class, Class, String, Job): Wraps a Hadoop mapreduce FileInputFormat with a specified Job, setting the input path.
  • readHadoopFile(FileInputFormat, Class, Class, String): Convenience overload that creates a default Job instance.
  • createHadoopInput(InputFormat, Class, Class, Job): Wraps any Hadoop mapreduce InputFormat with a given Job.

All methods produce key-value pairs that are converted into Flink Tuple2 objects, where Tuple2.f0 is the key and Tuple2.f1 is the value.

Usage

Use HadoopInputs when you need to read data from Hadoop-compatible data sources in a Flink program. Common scenarios include:

  • Reading from HDFS files using existing Hadoop FileInputFormats.
  • Reading Hadoop SequenceFiles containing serialized key-value pairs.
  • Migrating existing Hadoop MapReduce jobs to Flink while reusing InputFormat implementations.
  • Reading from any Hadoop-compatible storage system (HBase, Cassandra via Hadoop connectors, etc.) using the corresponding Hadoop InputFormat.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
  • Lines: 1-161

Signature

public final class HadoopInputs

Import

import org.apache.flink.hadoopcompatibility.HadoopInputs;

I/O Contract

Inputs

Name Type Required Description
mapredInputFormat org.apache.hadoop.mapred.InputFormat<K, V> or org.apache.hadoop.mapreduce.InputFormat<K, V> Yes The Hadoop InputFormat to wrap
key Class<K> Yes The class of the key type produced by the Hadoop InputFormat
value Class<V> Yes The class of the value type produced by the Hadoop InputFormat
inputPath String Conditional The file path to read from (required for file-based readHadoopFile and readSequenceFile methods)
job / jobConf Job or JobConf No Hadoop configuration; defaults are created if not provided

Outputs

Name Type Description
HadoopInputFormat<K, V> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat<K, V> Flink wrapper for old mapred API InputFormats, producing Tuple2<K, V>
HadoopInputFormat<K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> Flink wrapper for new mapreduce API InputFormats, producing Tuple2<K, V>

Usage Examples

// Read a Hadoop SequenceFile
DataSource<Tuple2<LongWritable, Text>> sequenceData =
    env.createInput(HadoopInputs.readSequenceFile(LongWritable.class, Text.class, "/path/to/sequencefile"));

// Read from a Hadoop FileInputFormat (old API) with custom JobConf
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.input.fileinputformat.split.maxsize", "67108864");
DataSource<Tuple2<LongWritable, Text>> fileData =
    env.createInput(HadoopInputs.readHadoopFile(
        new TextInputFormat(), LongWritable.class, Text.class, "/path/to/input", jobConf));

// Wrap a general Hadoop InputFormat (old API)
DataSource<Tuple2<LongWritable, Text>> generalData =
    env.createInput(HadoopInputs.createHadoopInput(
        new TextInputFormat(), LongWritable.class, Text.class, jobConf));

// Read from a Hadoop FileInputFormat (new API)
Job job = Job.getInstance();
DataSource<Tuple2<LongWritable, Text>> newApiData =
    env.createInput(HadoopInputs.readHadoopFile(
        new org.apache.hadoop.mapreduce.lib.input.TextInputFormat(),
        LongWritable.class, Text.class, "/path/to/input", job));

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment