Implementation:Apache Flink CompactCoordinator ProcessElement
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Coordination |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for collecting file committables and triggering per-bucket compaction requests provided by the Apache Flink connector-files module.
Description
The CompactCoordinator extends AbstractStreamOperator and processes CommittableMessage<FileSinkCommittable> at parallelism 1. The processElement method extracts the committable and delegates to packAndTrigger, which sorts it into the per-bucket CompactTrigger. The trigger evaluates checkpoint count and accumulated size. When fired, fireAndPurge emits a CompactorRequest downstream and resets the trigger for that bucket.
State is persisted in REMAINING_COMMITTABLE_RAW_STATES_DESC list state for recovery.
Usage
This is an internal operator automatically inserted into the sink topology when compaction is enabled.
Code Reference
Source Location
- Repository: Apache Flink
- File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
- Lines: L63-268
Signature
@Internal
public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest>
implements OneInputStreamOperator<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
BoundedOneInput {
public CompactCoordinator(
StreamOperatorParameters<CompactorRequest> parameters,
FileCompactStrategy strategy,
SimpleVersionedSerializer<FileSinkCommittable> committableSerializer);
@Override
public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element)
throws Exception;
@Override
public void endInput() throws Exception;
@Override
public void snapshotState(StateSnapshotContext context) throws Exception;
@Override
public void initializeState(StateInitializationContext context) throws Exception;
}
Import
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator;
// Internal class
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| element | StreamRecord<CommittableMessage<FileSinkCommittable>> | Yes | Committable from writers |
| strategy | FileCompactStrategy | Yes | Compaction trigger config |
Outputs
| Name | Type | Description |
|---|---|---|
| request | CompactorRequest | Per-bucket compaction request emitted downstream |
Usage Examples
Processing Flow
// CompactCoordinator.processElement():
// 1. Extract FileSinkCommittable from CommittableWithLineage
// 2. packAndTrigger(committable):
// a. Get bucketId from committable
// b. If hasPendingFile() -> trigger.addToCompact(bucketId, committable)
// c. Else -> trigger.addToPassthrough(bucketId, committable)
// d. If trigger fires for this bucket -> fireAndPurge(bucketId)
// 3. fireAndPurge(bucketId):
// a. Create CompactorRequest(bucketId, filesToCompact, filesToPassthrough)
// b. Emit as StreamRecord downstream
// c. Reset trigger for this bucket