Implementation:Apache Flink HadoopInputs
| Knowledge Sources | |
|---|---|
| Domains | Hadoop_Compatibility, Connectors |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
HadoopInputs is a utility class that provides static factory methods for creating Flink InputFormat wrappers around Hadoop InputFormats from both the old (mapred) and new (mapreduce) Hadoop APIs.
Description
HadoopInputs is a final utility class in the org.apache.flink.hadoopcompatibility package that enables Apache Flink to read data from Hadoop-compatible data sources. It provides a collection of static factory methods that wrap Hadoop InputFormats into Flink InputFormat instances.
The class supports both Hadoop APIs:
Old API (mapred):
- readHadoopFile(FileInputFormat, Class, Class, String, JobConf): Wraps a Hadoop mapred FileInputFormat with a specified JobConf, setting the input path on the configuration.
- readHadoopFile(FileInputFormat, Class, Class, String): Convenience overload that creates a default JobConf.
- readSequenceFile(Class, Class, String): Specialized convenience method that reads Hadoop SequenceFiles using SequenceFileInputFormat.
- createHadoopInput(InputFormat, Class, Class, JobConf): Wraps any Hadoop mapred InputFormat (not just file-based) with a given JobConf.
New API (mapreduce):
- readHadoopFile(FileInputFormat, Class, Class, String, Job): Wraps a Hadoop mapreduce FileInputFormat with a specified Job, setting the input path.
- readHadoopFile(FileInputFormat, Class, Class, String): Convenience overload that creates a default Job instance.
- createHadoopInput(InputFormat, Class, Class, Job): Wraps any Hadoop mapreduce InputFormat with a given Job.
All methods produce key-value pairs that are converted into Flink Tuple2 objects, where Tuple2.f0 is the key and Tuple2.f1 is the value.
Usage
Use HadoopInputs when you need to read data from Hadoop-compatible data sources in a Flink program. Common scenarios include:
- Reading from HDFS files using existing Hadoop FileInputFormats.
- Reading Hadoop SequenceFiles containing serialized key-value pairs.
- Migrating existing Hadoop MapReduce jobs to Flink while reusing InputFormat implementations.
- Reading from any Hadoop-compatible storage system (HBase, Cassandra via Hadoop connectors, etc.) using the corresponding Hadoop InputFormat.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
- Lines: 1-161
Signature
public final class HadoopInputs
Import
import org.apache.flink.hadoopcompatibility.HadoopInputs;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mapredInputFormat | org.apache.hadoop.mapred.InputFormat<K, V> or org.apache.hadoop.mapreduce.InputFormat<K, V> | Yes | The Hadoop InputFormat to wrap |
| key | Class<K> | Yes | The class of the key type produced by the Hadoop InputFormat |
| value | Class<V> | Yes | The class of the value type produced by the Hadoop InputFormat |
| inputPath | String | Conditional | The file path to read from (required for file-based readHadoopFile and readSequenceFile methods) |
| job / jobConf | Job or JobConf | No | Hadoop configuration; defaults are created if not provided |
Outputs
| Name | Type | Description |
|---|---|---|
| HadoopInputFormat<K, V> | org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat<K, V> | Flink wrapper for old mapred API InputFormats, producing Tuple2<K, V> |
| HadoopInputFormat<K, V> | org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> | Flink wrapper for new mapreduce API InputFormats, producing Tuple2<K, V> |
Usage Examples
// Read a Hadoop SequenceFile
DataSource<Tuple2<LongWritable, Text>> sequenceData =
env.createInput(HadoopInputs.readSequenceFile(LongWritable.class, Text.class, "/path/to/sequencefile"));
// Read from a Hadoop FileInputFormat (old API) with custom JobConf
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.input.fileinputformat.split.maxsize", "67108864");
DataSource<Tuple2<LongWritable, Text>> fileData =
env.createInput(HadoopInputs.readHadoopFile(
new TextInputFormat(), LongWritable.class, Text.class, "/path/to/input", jobConf));
// Wrap a general Hadoop InputFormat (old API)
DataSource<Tuple2<LongWritable, Text>> generalData =
env.createInput(HadoopInputs.createHadoopInput(
new TextInputFormat(), LongWritable.class, Text.class, jobConf));
// Read from a Hadoop FileInputFormat (new API)
Job job = Job.getInstance();
DataSource<Tuple2<LongWritable, Text>> newApiData =
env.createInput(HadoopInputs.readHadoopFile(
new org.apache.hadoop.mapreduce.lib.input.TextInputFormat(),
LongWritable.class, Text.class, "/path/to/input", job));