Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Flink FileCompactStrategy Builder

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, File_IO
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for configuring file compaction triggers and strategies provided by the Apache Flink connector-files module.

Description

FileCompactStrategy.Builder configures when compaction triggers (checkpoint count or size threshold) and the thread pool size. ConcatFileCompactor performs byte-level file concatenation with optional delimiters. RecordWiseFileCompactor<IN> reads records via a Reader.Factory and rewrites them through a Writer interface, supporting format changes during compaction.

Usage

Use ConcatFileCompactor for row-format sinks (text, CSV, JSON lines) and RecordWiseFileCompactor for formats requiring proper file structure (Parquet, ORC).

Code Reference

Source Location

  • Repository: Apache Flink
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.java
  • Lines: L30-112
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.java
  • Lines: L38-72
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
  • Lines: L33-86

Signature

@PublicEvolving
public class FileCompactStrategy implements Serializable {
    public long getSizeThreshold();
    public int getNumCheckpointsBeforeCompaction();
    public int getNumCompactThreads();

    public static class Builder {
        public static Builder newBuilder();
        public Builder enableCompactionOnCheckpoint(int numCheckpointsBeforeCompaction);
        public Builder setSizeThreshold(long sizeThreshold);
        public Builder setNumCompactThreads(int numCompactThreads);
        public FileCompactStrategy build();
    }
}

@PublicEvolving
public class ConcatFileCompactor extends OutputStreamBasedFileCompactor {
    public ConcatFileCompactor();
    public ConcatFileCompactor(@Nullable byte[] fileDelimiter);
}

@PublicEvolving
public class RecordWiseFileCompactor<IN> implements FileCompactor {
    public RecordWiseFileCompactor(Reader.Factory<IN> readerFactory);
    public void compact(List<Path> inputFiles, Writer<IN> writer) throws Exception;
}

Import

import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.ConcatFileCompactor;
import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;

I/O Contract

Inputs

Name Type Required Description
numCheckpointsBeforeCompaction int Yes (at least one trigger) Checkpoints before triggering
sizeThreshold long No Total size threshold to trigger
numCompactThreads int No Thread pool size (default: CPU cores)

Outputs

Name Type Description
strategy FileCompactStrategy Configured compaction strategy
compactor FileCompactor ConcatFileCompactor or RecordWiseFileCompactor

Usage Examples

Enabling Compaction

FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder()
    .enableCompactionOnCheckpoint(5)
    .setSizeThreshold(1024 * 1024 * 128)  // 128 MB
    .build();

FileSink<String> sink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<>("UTF-8"))
    .enableCompact(strategy, new ConcatFileCompactor())
    .build();

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment