Implementation:Apache Flink Mapreduce HadoopInputSplit
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Hadoop_Compatibility |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A wrapper class that adapts a Hadoop mapreduce InputSplit to Flink's LocatableInputSplit interface, enabling Flink to work with new-style Hadoop input splits for data locality and parallelism.
Description
HadoopInputSplit is a class in the org.apache.flink.api.java.hadoop.mapreduce.wrapper package that extends Flink's LocatableInputSplit. It wraps a Hadoop org.apache.hadoop.mapreduce.InputSplit so that the Flink runtime can use it as a native Flink InputSplit while preserving the Hadoop split's data locality information.
The constructor accepts a split number, a Hadoop InputSplit, and a JobContext. It validates that the Hadoop InputSplit is not null and that it implements the Writable interface (throwing IllegalArgumentException otherwise). The getHostnames() method delegates to the Hadoop split's getLocations() method, returning an empty array on failure.
Custom serialization is implemented to handle the Hadoop InputSplit correctly:
- writeObject: Serializes the parent fields via defaultWriteObject(), then writes the Hadoop InputSplit's data by casting it to Writable and calling write().
- readObject: Deserializes the parent fields, instantiates the InputSplit via WritableFactories.newInstance() (casting the split type to a Writable subclass), and reads the split's data via readFields().
Unlike the mapred variant, this class does not need to serialize or manage a JobConf because the new mapreduce API's InputSplit does not use Configurable or JobConfigurable patterns.
This class is annotated with @Internal, indicating it is not part of the public API.
Usage
This class is used internally by Flink's Hadoop compatibility layer. When a HadoopInputFormatBase (mapreduce variant) creates input splits from a Hadoop mapreduce InputFormat, each Hadoop InputSplit is wrapped in a HadoopInputSplit instance. Users do not typically instantiate this class directly.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
- Lines: 1-107
Signature
@Internal
public class HadoopInputSplit extends LocatableInputSplit
Import
import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| splitNumber | int | Yes | The number identifying this split within the set of all splits |
| mapreduceInputSplit | org.apache.hadoop.mapreduce.InputSplit | Yes | The Hadoop mapreduce InputSplit to wrap; must implement the Writable interface |
| jobContext | org.apache.hadoop.mapreduce.JobContext | Yes | The Hadoop JobContext (used for validation but not stored) |
Outputs
| Name | Type | Description |
|---|---|---|
| hadoopInputSplit | org.apache.hadoop.mapreduce.InputSplit | The wrapped Hadoop InputSplit, accessible via getHadoopInputSplit() |
| hostnames | String[] | Data locality hostnames from the Hadoop InputSplit, accessible via getHostnames() |
Usage Examples
// HadoopInputSplit is typically created internally by HadoopInputFormatBase.
// Manual usage example for illustration:
import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
Job job = Job.getInstance();
FileSplit hadoopSplit = new FileSplit(new Path("/data/input"), 0, 1024, new String[]{"host1"});
HadoopInputSplit flinkSplit = new HadoopInputSplit(0, hadoopSplit, job);
String[] hosts = flinkSplit.getHostnames(); // returns ["host1"]
org.apache.hadoop.mapreduce.InputSplit inner = flinkSplit.getHadoopInputSplit();