Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink PartitionCommitPolicy

From Leeroopedia


Knowledge Sources
Domains Connectors, File_Connector
Last Updated 2026-02-09 00:00 GMT

Overview

An interface defining a pluggable policy for committing partitions in a file-based table sink, supporting metastore, success-file, and custom commit strategies.

Description

PartitionCommitPolicy is an experimental interface that defines how partitions are committed after data has been written to a file-based table. The implementation of the commit method must be idempotent because the same partition may be committed multiple times (e.g., during failure recovery).

The interface defines three built-in policy types as string constants:

  • METASTORE ("metastore") - Commits the partition to a metastore (e.g., Hive metastore). Only valid for Hive tables, not plain file system tables.
  • SUCCESS_FILE ("success-file") - Creates a success marker file (e.g., _SUCCESS) in the partition directory.
  • CUSTOM ("custom") - Allows user-defined commit logic such as RPC notifications to downstream applications or triggering Hive partition analysis for statistics generation.

The nested Context interface provides the commit method with all necessary table and partition metadata: catalog name, database name, table name, partition keys and values, the partition path, and a computed partition spec as a LinkedHashMap.

A static validatePolicyChain method validates that a comma-separated chain of policy kinds does not include "metastore" when the table has no metastore configured, throwing a ValidationException if violated.

Usage

Implement PartitionCommitPolicy to define custom partition commit behavior for file-based table sinks. Configure it via the file system connector's table options by specifying the commit policy kind. Multiple policies can be chained using comma separation (e.g., "metastore,success-file"). Use the Context to access partition metadata during commit.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicy.java
  • Lines: 1-95

Signature

@Experimental
public interface PartitionCommitPolicy {

    String METASTORE = "metastore";
    String SUCCESS_FILE = "success-file";
    String CUSTOM = "custom";

    void commit(Context context) throws Exception;

    interface Context {
        String catalogName();
        String databaseName();
        String tableName();
        List<String> partitionKeys();
        List<String> partitionValues();
        Path partitionPath();
        default LinkedHashMap<String, String> partitionSpec();
    }

    static void validatePolicyChain(boolean isEmptyMetastore, String policyKind);
}

Import

import org.apache.flink.connector.file.table.PartitionCommitPolicy;

I/O Contract

Inputs

Name Type Required Description
context PartitionCommitPolicy.Context Yes Provides table metadata (catalog, database, table name), partition keys, partition values, and the partition file path.
isEmptyMetastore boolean Yes (for validation) Indicates whether the table has no metastore configured; used by validatePolicyChain.
policyKind String Yes (for validation) Comma-separated policy kinds to validate (e.g., "metastore,success-file").

Outputs

Name Type Description
(side effect) void The commit action is performed as a side effect (e.g., metastore update, success file creation, or custom action).
ValidationException Exception Thrown by validatePolicyChain if "metastore" policy is used with a file system table that has no metastore.

Usage Examples

// Implement a custom partition commit policy that notifies a downstream service
public class NotifyCommitPolicy implements PartitionCommitPolicy {

    @Override
    public void commit(Context context) throws Exception {
        String partition = context.catalogName() + "."
            + context.databaseName() + "."
            + context.tableName();
        LinkedHashMap<String, String> spec = context.partitionSpec();
        Path path = context.partitionPath();

        // Notify downstream application about the new partition
        NotificationService.notify(partition, spec, path);
    }
}

// Validate a policy chain before applying it
PartitionCommitPolicy.validatePolicyChain(
    true,  // no metastore configured
    "success-file"  // valid for file system tables
);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment