Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink CompactorOperator ProcessElement

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, File_IO
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for executing file compaction asynchronously using a thread pool provided by the Apache Flink connector-files module.

Description

The CompactorOperator extends AbstractStreamOperator and delegates compaction to CompactService. On processElement, it submits the CompactorRequest to the service with a CompletableFuture. The CompactService.compact method resolves file paths from the committables, opens a CompactingFileWriter via the BucketWriter, and runs the FileCompactor strategy. The compacted output file uses the "compacted-" prefix. Requests are tracked per checkpoint in a TreeMap.

Usage

This is an internal operator that runs at the default parallelism for concurrent compaction across buckets.

Code Reference

Source Location

  • Repository: Apache Flink
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
  • Lines: L76-351
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
  • Lines: L47-176

Signature

@Internal
public class CompactorOperator
        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
        implements OneInputStreamOperator<CompactorRequest, CommittableMessage<FileSinkCommittable>>,
                BoundedOneInput, CheckpointListener {

    @Override
    public void processElement(StreamRecord<CompactorRequest> element) throws Exception;

    @Override
    public void endInput() throws Exception;

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception;
}

@Internal
public class CompactService {
    public CompactService(int numCompactThreads, FileCompactor fileCompactor,
                          BucketWriter<?, String> bucketWriter);
    public void open();
    public void submit(CompactorRequest request,
                       CompletableFuture<Iterable<FileSinkCommittable>> resultFuture);
    public void close();
}

Import

import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator;
import org.apache.flink.connector.file.sink.compactor.operator.CompactService;
// Internal classes

I/O Contract

Inputs

Name Type Required Description
element StreamRecord<CompactorRequest> Yes Compaction request from coordinator

Outputs

Name Type Description
committables CommittableMessage<FileSinkCommittable> Compacted file committables for downstream committer

Usage Examples

Compaction Execution

// CompactorOperator.processElement():
// 1. Receive CompactorRequest from coordinator
// 2. Submit to CompactService with a CompletableFuture
//
// CompactService.compact():
// 1. Resolve file paths from committables' pending file recoverables
// 2. Open CompactingFileWriter via bucketWriter
// 3. Execute FileCompactor (concat or record-wise)
// 4. Close writer -> produces new FileSinkCommittable
// 5. Return [newCommittable] + passthroughCommittables
//
// Output: CommittableMessage stream to FileCommitter

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment