Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink DefaultRollingPolicy Builder

From Leeroopedia
Revision as of 14:17, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Flink_DefaultRollingPolicy_Builder.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Stream_Processing, File_IO
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for configuring file rotation policies based on size, time, and inactivity thresholds provided by the Apache Flink file-sink-common module.

Description

The DefaultRollingPolicy class implements the RollingPolicy interface with three configurable thresholds: maximum part file size, rollover interval (time since file creation), and inactivity interval (time since last write). The PolicyBuilder inner class provides a fluent API for configuring these thresholds. For bulk formats, OnCheckpointRollingPolicy is used instead, as bulk writers cannot split mid-write.

Usage

Use this implementation when fine-grained control over output file sizes and durations is needed. The defaults (128 MB size, 60s rollover, 60s inactivity) work well for general streaming workloads.

Code Reference

Source Location

  • Repository: Apache Flink
  • File: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
  • Lines: L44-235

Signature

@PublicEvolving
public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {

    // Defaults
    private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;  // 60 seconds
    private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;    // 60 seconds
    private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L; // 128 MB

    public static DefaultRollingPolicy.PolicyBuilder builder();

    public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState);
    public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element);
    public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);

    // PolicyBuilder inner class
    public static final class PolicyBuilder {
        public PolicyBuilder withMaxPartSize(MemorySize size);
        public PolicyBuilder withInactivityInterval(Duration interval);
        public PolicyBuilder withRolloverInterval(Duration interval);
        public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build();
    }
}

Import

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;

I/O Contract

Inputs

Name Type Required Description
maxPartSize MemorySize No Maximum file size before rolling (default 128 MB)
rolloverInterval Duration No Maximum time since file creation (default 60s)
inactivityInterval Duration No Maximum idle time since last write (default 60s)

Outputs

Name Type Description
policy DefaultRollingPolicy<IN, BucketID> Configured rolling policy instance

Usage Examples

Custom Rolling Policy

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;

DefaultRollingPolicy<String, String> policy = DefaultRollingPolicy.builder()
    .withMaxPartSize(MemorySize.ofMebiBytes(256))
    .withRolloverInterval(Duration.ofMinutes(10))
    .withInactivityInterval(Duration.ofSeconds(30))
    .build();

FileSink<String> sink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<>("UTF-8"))
    .withRollingPolicy(policy)
    .build();

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment