Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink Mapreduce HadoopOutputFormatBase

From Leeroopedia
Revision as of 14:17, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Flink_Mapreduce_HadoopOutputFormatBase.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Connectors, Hadoop_Compatibility
Last Updated 2026-02-09 00:00 GMT

Overview

An abstract base class that provides the core lifecycle management (configure, open, close, finalize) for wrapping Hadoop's new mapreduce OutputFormat API within Flink's output format framework.

Description

HadoopOutputFormatBase is an abstract class in the org.apache.flink.api.java.hadoop.mapreduce package that extends HadoopOutputFormatCommonBase<T> and implements FinalizeOnMaster. It serves as the common base shared between the Java and Scala API variants for the mapreduce OutputFormat wrapper.

The class manages the full output lifecycle:

  • configure(): Configures the underlying Hadoop OutputFormat by calling setConf() if it implements Configurable. Synchronized via CONFIGURE_MUTEX.
  • open(): Creates a TaskAttemptID, sets up the TaskAttemptContext and OutputCommitter, adds Hadoop credentials from both stored credentials and UserGroupInformation, handles FileOutputCommitter work path configuration for Hadoop 2.2 compatibility, and obtains a RecordWriter from the Hadoop OutputFormat. Synchronized via OPEN_MUTEX.
  • close(): Closes the RecordWriter, commits the task through the OutputCommitter if needed, and renames temporary output files to their final names on the filesystem. Synchronized via CLOSE_MUTEX.
  • finalizeGlobal(): Called on the master to commit the entire job via OutputCommitter.commitJob(), with proper credential propagation.

The class uses three static mutex objects (OPEN_MUTEX, CONFIGURE_MUTEX, CLOSE_MUTEX) to enforce sequential access because Hadoop OutputFormats assume JVM-level isolation across tasks, whereas Flink uses thread-level parallelism within a single JVM.

Custom serialization is implemented via writeObject and readObject to handle the Hadoop Configuration and OutputFormat class name. During deserialization, the OutputFormat is reconstructed by class name via reflection.

This class is annotated with @Internal, indicating it is not part of the public API and is intended for internal use.

Usage

This class is not used directly. Instead, use the concrete subclass HadoopOutputFormat<K, V> which extends this base class and implements the writeRecord method to handle Tuple2<K, V> records. This base class is extended by both Java and Scala implementations of the mapreduce output format wrapper.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
  • Lines: 1-278

Signature

@Internal
public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T>
        implements FinalizeOnMaster

Import

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase;

I/O Contract

Inputs

Name Type Required Description
mapreduceOutputFormat org.apache.hadoop.mapreduce.OutputFormat<K, V> Yes The Hadoop mapreduce OutputFormat instance to wrap
job org.apache.hadoop.mapreduce.Job Yes The Hadoop Job instance providing configuration and credentials

Outputs

Name Type Description
recordWriter org.apache.hadoop.mapreduce.RecordWriter<K, V> The Hadoop RecordWriter used to write key-value pairs to the underlying output destination
outputCommitter org.apache.hadoop.mapreduce.OutputCommitter Manages task and job commit/abort lifecycle for the output

Usage Examples

// HadoopOutputFormatBase is abstract and not used directly.
// The concrete subclass HadoopOutputFormat is used instead:
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;

Job job = Job.getInstance();
TextOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("/path/to/output"));

HadoopOutputFormat<LongWritable, Text> hadoopOF =
    new HadoopOutputFormat<>(new TextOutputFormat<LongWritable, Text>(), job);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment