Implementation:Apache Flink DefaultRollingPolicy Builder
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, File_IO |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for configuring file rotation policies based on size, time, and inactivity thresholds provided by the Apache Flink file-sink-common module.
Description
The DefaultRollingPolicy class implements the RollingPolicy interface with three configurable thresholds: maximum part file size, rollover interval (time since file creation), and inactivity interval (time since last write). The PolicyBuilder inner class provides a fluent API for configuring these thresholds. For bulk formats, OnCheckpointRollingPolicy is used instead, as bulk writers cannot split mid-write.
Usage
Use this implementation when fine-grained control over output file sizes and durations is needed. The defaults (128 MB size, 60s rollover, 60s inactivity) work well for general streaming workloads.
Code Reference
Source Location
- Repository: Apache Flink
- File: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
- Lines: L44-235
Signature
@PublicEvolving
public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
// Defaults
private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L; // 60 seconds
private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L; // 60 seconds
private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L; // 128 MB
public static DefaultRollingPolicy.PolicyBuilder builder();
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState);
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element);
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);
// PolicyBuilder inner class
public static final class PolicyBuilder {
public PolicyBuilder withMaxPartSize(MemorySize size);
public PolicyBuilder withInactivityInterval(Duration interval);
public PolicyBuilder withRolloverInterval(Duration interval);
public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build();
}
}
Import
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| maxPartSize | MemorySize | No | Maximum file size before rolling (default 128 MB) |
| rolloverInterval | Duration | No | Maximum time since file creation (default 60s) |
| inactivityInterval | Duration | No | Maximum idle time since last write (default 60s) |
Outputs
| Name | Type | Description |
|---|---|---|
| policy | DefaultRollingPolicy<IN, BucketID> | Configured rolling policy instance |
Usage Examples
Custom Rolling Policy
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;
DefaultRollingPolicy<String, String> policy = DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(256))
.withRolloverInterval(Duration.ofMinutes(10))
.withInactivityInterval(Duration.ofSeconds(30))
.build();
FileSink<String> sink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(policy)
.build();