Implementation:Apache Flink FileCompactStrategy Builder
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, File_IO |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for configuring file compaction triggers and strategies provided by the Apache Flink connector-files module.
Description
FileCompactStrategy.Builder configures when compaction triggers (checkpoint count or size threshold) and the thread pool size. ConcatFileCompactor performs byte-level file concatenation with optional delimiters. RecordWiseFileCompactor<IN> reads records via a Reader.Factory and rewrites them through a Writer interface, supporting format changes during compaction.
Usage
Use ConcatFileCompactor for row-format sinks (text, CSV, JSON lines) and RecordWiseFileCompactor for formats requiring proper file structure (Parquet, ORC).
Code Reference
Source Location
- Repository: Apache Flink
- File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.java
- Lines: L30-112
- File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.java
- Lines: L38-72
- File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
- Lines: L33-86
Signature
@PublicEvolving
public class FileCompactStrategy implements Serializable {
public long getSizeThreshold();
public int getNumCheckpointsBeforeCompaction();
public int getNumCompactThreads();
public static class Builder {
public static Builder newBuilder();
public Builder enableCompactionOnCheckpoint(int numCheckpointsBeforeCompaction);
public Builder setSizeThreshold(long sizeThreshold);
public Builder setNumCompactThreads(int numCompactThreads);
public FileCompactStrategy build();
}
}
@PublicEvolving
public class ConcatFileCompactor extends OutputStreamBasedFileCompactor {
public ConcatFileCompactor();
public ConcatFileCompactor(@Nullable byte[] fileDelimiter);
}
@PublicEvolving
public class RecordWiseFileCompactor<IN> implements FileCompactor {
public RecordWiseFileCompactor(Reader.Factory<IN> readerFactory);
public void compact(List<Path> inputFiles, Writer<IN> writer) throws Exception;
}
Import
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.ConcatFileCompactor;
import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| numCheckpointsBeforeCompaction | int | Yes (at least one trigger) | Checkpoints before triggering |
| sizeThreshold | long | No | Total size threshold to trigger |
| numCompactThreads | int | No | Thread pool size (default: CPU cores) |
Outputs
| Name | Type | Description |
|---|---|---|
| strategy | FileCompactStrategy | Configured compaction strategy |
| compactor | FileCompactor | ConcatFileCompactor or RecordWiseFileCompactor |
Usage Examples
Enabling Compaction
FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder()
.enableCompactionOnCheckpoint(5)
.setSizeThreshold(1024 * 1024 * 128) // 128 MB
.build();
FileSink<String> sink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<>("UTF-8"))
.enableCompact(strategy, new ConcatFileCompactor())
.build();