Implementation:Apache Flink Mapreduce HadoopOutputFormatBase
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Hadoop_Compatibility |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
An abstract base class that provides the core lifecycle management (configure, open, close, finalize) for wrapping Hadoop's new mapreduce OutputFormat API within Flink's output format framework.
Description
HadoopOutputFormatBase is an abstract class in the org.apache.flink.api.java.hadoop.mapreduce package that extends HadoopOutputFormatCommonBase<T> and implements FinalizeOnMaster. It serves as the common base shared between the Java and Scala API variants for the mapreduce OutputFormat wrapper.
The class manages the full output lifecycle:
- configure(): Configures the underlying Hadoop OutputFormat by calling setConf() if it implements Configurable. Synchronized via CONFIGURE_MUTEX.
- open(): Creates a TaskAttemptID, sets up the TaskAttemptContext and OutputCommitter, adds Hadoop credentials from both stored credentials and UserGroupInformation, handles FileOutputCommitter work path configuration for Hadoop 2.2 compatibility, and obtains a RecordWriter from the Hadoop OutputFormat. Synchronized via OPEN_MUTEX.
- close(): Closes the RecordWriter, commits the task through the OutputCommitter if needed, and renames temporary output files to their final names on the filesystem. Synchronized via CLOSE_MUTEX.
- finalizeGlobal(): Called on the master to commit the entire job via OutputCommitter.commitJob(), with proper credential propagation.
The class uses three static mutex objects (OPEN_MUTEX, CONFIGURE_MUTEX, CLOSE_MUTEX) to enforce sequential access because Hadoop OutputFormats assume JVM-level isolation across tasks, whereas Flink uses thread-level parallelism within a single JVM.
Custom serialization is implemented via writeObject and readObject to handle the Hadoop Configuration and OutputFormat class name. During deserialization, the OutputFormat is reconstructed by class name via reflection.
This class is annotated with @Internal, indicating it is not part of the public API and is intended for internal use.
Usage
This class is not used directly. Instead, use the concrete subclass HadoopOutputFormat<K, V> which extends this base class and implements the writeRecord method to handle Tuple2<K, V> records. This base class is extended by both Java and Scala implementations of the mapreduce output format wrapper.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
- Lines: 1-278
Signature
@Internal
public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T>
implements FinalizeOnMaster
Import
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mapreduceOutputFormat | org.apache.hadoop.mapreduce.OutputFormat<K, V> | Yes | The Hadoop mapreduce OutputFormat instance to wrap |
| job | org.apache.hadoop.mapreduce.Job | Yes | The Hadoop Job instance providing configuration and credentials |
Outputs
| Name | Type | Description |
|---|---|---|
| recordWriter | org.apache.hadoop.mapreduce.RecordWriter<K, V> | The Hadoop RecordWriter used to write key-value pairs to the underlying output destination |
| outputCommitter | org.apache.hadoop.mapreduce.OutputCommitter | Manages task and job commit/abort lifecycle for the output |
Usage Examples
// HadoopOutputFormatBase is abstract and not used directly.
// The concrete subclass HadoopOutputFormat is used instead:
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
Job job = Job.getInstance();
TextOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("/path/to/output"));
HadoopOutputFormat<LongWritable, Text> hadoopOF =
new HadoopOutputFormat<>(new TextOutputFormat<LongWritable, Text>(), job);