Principle:Apache Flink File Connector Table Integration
| Knowledge Sources | |
|---|---|
| Domains | File_Connector, Table_API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Description
The File Connector Table Integration layer bridges Flink's low-level file connector infrastructure with the high-level Table/SQL API. It addresses two major concerns that arise when file-based sources and sinks are exposed as SQL tables: partition lifecycle management (when and how to commit partitions to external metastores or marker files) and format factory binding (how bulk read/write formats are discovered and wired into table sources and sinks).
The key interfaces in this integration layer are:
- PartitionCommitPolicy -- a pluggable strategy that defines what happens when a partition is considered complete (e.g., writing a
_SUCCESSmarker file, updating a Hive metastore, or triggering a custom RPC notification). - PartitionTimeExtractor -- a pluggable strategy for deriving a
LocalDateTimefrom partition key/value pairs, enabling time-based partition commit triggers. - BulkReaderFormatFactory and BulkWriterFormatFactory -- SPI interfaces that connect format discovery to table source/sink construction.
- BulkDecodingFormat -- a format wrapper that extends the Table API's
DecodingFormatwith file-source-specific filter pushdown capabilities.
Theoretical Basis
Partition Commit as a Policy Chain
In Flink's streaming file sink, partitions are directories that accumulate data files over time. A partition is "committed" when no more data will be written to it (determined by watermarks or processing-time triggers). The what happens at commit time is deliberately separated from the when decision through the PartitionCommitPolicy interface.
Flink supports three built-in policy types that can be composed into a chain:
| Policy | Identifier | Behavior |
|---|---|---|
| MetastoreCommitPolicy | metastore |
Registers or updates the partition in an external catalog (e.g., Hive Metastore). Only valid for catalog-backed tables. |
| SuccessFileCommitPolicy | success-file |
Writes a _SUCCESS marker file to the partition directory, signaling downstream consumers.
|
| Custom | custom |
User-provided implementation loaded by class name, enabling arbitrary side effects such as RPC notifications or analytics triggers. |
Policies are specified as a comma-separated list in configuration, and Flink validates the chain at job submission time. For example, a metastore policy is rejected for plain file system tables that have no catalog backing:
public interface PartitionCommitPolicy {
void commit(Context context) throws Exception;
interface Context {
String catalogName();
String databaseName();
String tableName();
List<String> partitionKeys();
List<String> partitionValues();
Path partitionPath();
default LinkedHashMap<String, String> partitionSpec() { ... }
}
}
A critical design requirement is idempotency: the same partition may be committed multiple times (due to failover and checkpoint replay), so every policy implementation must produce the same result regardless of how many times it is invoked.
Time-Based Partition Triggers
The PartitionTimeExtractor interface enables the partition commit trigger to derive a logical timestamp from a partition's key/value pairs. This timestamp is compared against the current watermark to decide whether the partition is complete.
Two extraction strategies are supported:
| Strategy | Identifier | Mechanism |
|---|---|---|
| Default | default |
Uses a configurable pattern and date-time formatter to parse partition values (e.g., year=2024/month=01/day=15) into a LocalDateTime.
|
| Custom | custom |
Loads a user-provided PartitionTimeExtractor implementation by class name via the user classloader.
|
public interface PartitionTimeExtractor extends Serializable {
LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);
}
The use of Serializable ensures the extractor can be shipped to task managers as part of the operator's serialized configuration.
Format Factory Binding
The Table/SQL layer uses the BulkReaderFormatFactory and BulkWriterFormatFactory SPI interfaces to discover format implementations. When a user declares a table with FORMAT = 'parquet', FactoryUtil uses Java's ServiceLoader to locate the matching factory, which then produces a BulkDecodingFormat (for reading) or a BulkWriter.Factory (for writing). The BulkDecodingFormat adds optional filter pushdown, allowing the optimizer to push predicate expressions into the format reader for more efficient I/O.