Implementation:Apache Flink BucketAssigner GetBucketId
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Data_Partitioning |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete interface for assigning incoming records to named output buckets (subdirectories) provided by the Apache Flink file-sink-common module.
Description
The BucketAssigner interface defines the contract for bucket assignment in the file sink. Its primary method getBucketId receives an element and a Context providing temporal information (processing time, watermark, event timestamp) and returns a bucket identifier used as the subdirectory name. Built-in implementations include DateTimeBucketAssigner (hourly directories based on processing time) and BasePathBucketAssigner (all records to one directory).
Usage
Implement this interface when the default time-based bucketing does not match your partitioning requirements. Common custom implementations include Hive-style partitioning (e.g., country=US/date=2024-01-15) or hash-based partitioning.
Code Reference
Source Location
- Repository: Apache Flink
- File: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
- Lines: L46-89
Signature
@PublicEvolving
public interface BucketAssigner<IN, BucketID> extends Serializable {
BucketID getBucketId(IN element, BucketAssigner.Context context);
SimpleVersionedSerializer<BucketID> getSerializer();
@PublicEvolving
interface Context {
long currentProcessingTime();
long currentWatermark();
@Nullable Long timestamp();
}
}
Import
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| element | IN | Yes | The incoming record to assign |
| context | BucketAssigner.Context | Yes | Temporal context (processing time, watermark, timestamp) |
Outputs
| Name | Type | Description |
|---|---|---|
| bucketId | BucketID (typically String) | Subdirectory name under the base path |
Usage Examples
Default Time-Based Bucketing
// Default: DateTimeBucketAssigner produces hourly directories
// Bucket IDs look like: "2024/01/15/14" (year/month/day/hour)
FileSink<String> sink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<>("UTF-8"))
// DateTimeBucketAssigner is the default, no explicit configuration needed
.build();
Custom Bucket Assigner
// Custom: partition by a field in the record
BucketAssigner<MyEvent, String> assigner = new BucketAssigner<MyEvent, String>() {
@Override
public String getBucketId(MyEvent element, Context context) {
return "country=" + element.getCountry();
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
FileSink<MyEvent> sink = FileSink
.forRowFormat(basePath, encoder)
.withBucketAssigner(assigner)
.build();