Implementation:Apache Flink Mapred HadoopOutputFormatBase
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Hadoop_Compatibility |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
An abstract base class that provides the core lifecycle management (configure, open, close, finalize) for wrapping Hadoop's legacy mapred OutputFormat API within Flink's output format framework.
Description
HadoopOutputFormatBase is an abstract class in the org.apache.flink.api.java.hadoop.mapred package that extends HadoopOutputFormatCommonBase<T> and implements FinalizeOnMaster. It serves as the common base for both Java and Scala variants of the mapred HadoopOutputFormat wrapper.
The class manages the full output lifecycle:
- configure(): Configures the underlying Hadoop OutputFormat. If the format implements Configurable or JobConfigurable, the appropriate configuration method is called. This method is synchronized via CONFIGURE_MUTEX to prevent concurrent configuration issues.
- open(): Creates a TaskAttemptID, initializes the TaskAttemptContext, sets up the OutputCommitter and job context, and obtains a RecordWriter from the Hadoop OutputFormat. This method is synchronized via OPEN_MUTEX.
- close(): Closes the RecordWriter and commits the task through the OutputCommitter if needed. This method is synchronized via CLOSE_MUTEX.
- finalizeGlobal(): Called on the master to commit the entire job through the OutputCommitter.commitJob() method.
The class uses three static mutex objects (OPEN_MUTEX, CONFIGURE_MUTEX, CLOSE_MUTEX) to enforce sequential access because Hadoop OutputFormats assume JVM-level isolation across tasks, whereas Flink uses thread-level parallelism within a single JVM.
Custom serialization is implemented via writeObject and readObject to handle the Hadoop JobConf and OutputFormat class name. During deserialization, the OutputFormat is reconstructed by class name via reflection, and Hadoop credentials are restored from both the stored credentials and the current UserGroupInformation.
This class is annotated with @Internal, indicating it is not part of the public API and is intended for internal use.
Usage
This class is not used directly. Instead, use the concrete subclass HadoopOutputFormat<K, V> which extends this base class and implements the writeRecord method to handle Tuple2<K, V> records. This base class is extended by both Java and Scala implementations of the mapred output format wrapper.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
- Lines: 1-231
Signature
@Internal
public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T>
implements FinalizeOnMaster
Import
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mapredOutputFormat | org.apache.hadoop.mapred.OutputFormat<K, V> | Yes | The Hadoop mapred OutputFormat instance to wrap |
| job | org.apache.hadoop.mapred.JobConf | Yes | The Hadoop JobConf containing configuration and credentials |
Outputs
| Name | Type | Description |
|---|---|---|
| recordWriter | org.apache.hadoop.mapred.RecordWriter<K, V> | The Hadoop RecordWriter used to write key-value pairs to the underlying output destination |
| outputCommitter | org.apache.hadoop.mapred.OutputCommitter | Manages task and job commit/abort lifecycle for the output |
Usage Examples
// HadoopOutputFormatBase is abstract and not used directly.
// The concrete subclass HadoopOutputFormat is used instead:
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobConf;
JobConf jobConf = new JobConf();
jobConf.set("mapred.output.dir", "/path/to/output");
HadoopOutputFormat<LongWritable, Text> hadoopOF =
new HadoopOutputFormat<>(new TextOutputFormat<LongWritable, Text>(), jobConf);