Implementation:Apache Flink CompactorOperator ProcessElement
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, File_IO |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for executing file compaction asynchronously using a thread pool provided by the Apache Flink connector-files module.
Description
The CompactorOperator extends AbstractStreamOperator and delegates compaction to CompactService. On processElement, it submits the CompactorRequest to the service with a CompletableFuture. The CompactService.compact method resolves file paths from the committables, opens a CompactingFileWriter via the BucketWriter, and runs the FileCompactor strategy. The compacted output file uses the "compacted-" prefix. Requests are tracked per checkpoint in a TreeMap.
Usage
This is an internal operator that runs at the default parallelism for concurrent compaction across buckets.
Code Reference
Source Location
- Repository: Apache Flink
- File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
- Lines: L76-351
- File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
- Lines: L47-176
Signature
@Internal
public class CompactorOperator
extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
implements OneInputStreamOperator<CompactorRequest, CommittableMessage<FileSinkCommittable>>,
BoundedOneInput, CheckpointListener {
@Override
public void processElement(StreamRecord<CompactorRequest> element) throws Exception;
@Override
public void endInput() throws Exception;
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception;
}
@Internal
public class CompactService {
public CompactService(int numCompactThreads, FileCompactor fileCompactor,
BucketWriter<?, String> bucketWriter);
public void open();
public void submit(CompactorRequest request,
CompletableFuture<Iterable<FileSinkCommittable>> resultFuture);
public void close();
}
Import
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator;
import org.apache.flink.connector.file.sink.compactor.operator.CompactService;
// Internal classes
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| element | StreamRecord<CompactorRequest> | Yes | Compaction request from coordinator |
Outputs
| Name | Type | Description |
|---|---|---|
| committables | CommittableMessage<FileSinkCommittable> | Compacted file committables for downstream committer |
Usage Examples
Compaction Execution
// CompactorOperator.processElement():
// 1. Receive CompactorRequest from coordinator
// 2. Submit to CompactService with a CompletableFuture
//
// CompactService.compact():
// 1. Resolve file paths from committables' pending file recoverables
// 2. Open CompactingFileWriter via bucketWriter
// 3. Execute FileCompactor (concat or record-wise)
// 4. Close writer -> produces new FileSinkCommittable
// 5. Return [newCommittable] + passthroughCommittables
//
// Output: CommittableMessage stream to FileCommitter