Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Flink CompactCoordinator ProcessElement

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Coordination
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for collecting file committables and triggering per-bucket compaction requests provided by the Apache Flink connector-files module.

Description

The CompactCoordinator extends AbstractStreamOperator and processes CommittableMessage<FileSinkCommittable> at parallelism 1. The processElement method extracts the committable and delegates to packAndTrigger, which sorts it into the per-bucket CompactTrigger. The trigger evaluates checkpoint count and accumulated size. When fired, fireAndPurge emits a CompactorRequest downstream and resets the trigger for that bucket.

State is persisted in REMAINING_COMMITTABLE_RAW_STATES_DESC list state for recovery.

Usage

This is an internal operator automatically inserted into the sink topology when compaction is enabled.

Code Reference

Source Location

  • Repository: Apache Flink
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
  • Lines: L63-268

Signature

@Internal
public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest>
        implements OneInputStreamOperator<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
                BoundedOneInput {

    public CompactCoordinator(
            StreamOperatorParameters<CompactorRequest> parameters,
            FileCompactStrategy strategy,
            SimpleVersionedSerializer<FileSinkCommittable> committableSerializer);

    @Override
    public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element)
            throws Exception;

    @Override
    public void endInput() throws Exception;

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception;

    @Override
    public void initializeState(StateInitializationContext context) throws Exception;
}

Import

import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator;
// Internal class

I/O Contract

Inputs

Name Type Required Description
element StreamRecord<CommittableMessage<FileSinkCommittable>> Yes Committable from writers
strategy FileCompactStrategy Yes Compaction trigger config

Outputs

Name Type Description
request CompactorRequest Per-bucket compaction request emitted downstream

Usage Examples

Processing Flow

// CompactCoordinator.processElement():
// 1. Extract FileSinkCommittable from CommittableWithLineage
// 2. packAndTrigger(committable):
//    a. Get bucketId from committable
//    b. If hasPendingFile() -> trigger.addToCompact(bucketId, committable)
//    c. Else -> trigger.addToPassthrough(bucketId, committable)
//    d. If trigger fires for this bucket -> fireAndPurge(bucketId)
// 3. fireAndPurge(bucketId):
//    a. Create CompactorRequest(bucketId, filesToCompact, filesToPassthrough)
//    b. Emit as StreamRecord downstream
//    c. Reset trigger for this bucket

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment