Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink BucketAssigner GetBucketId

From Leeroopedia
Revision as of 14:16, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Flink_BucketAssigner_GetBucketId.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Stream_Processing, Data_Partitioning
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete interface for assigning incoming records to named output buckets (subdirectories) provided by the Apache Flink file-sink-common module.

Description

The BucketAssigner interface defines the contract for bucket assignment in the file sink. Its primary method getBucketId receives an element and a Context providing temporal information (processing time, watermark, event timestamp) and returns a bucket identifier used as the subdirectory name. Built-in implementations include DateTimeBucketAssigner (hourly directories based on processing time) and BasePathBucketAssigner (all records to one directory).

Usage

Implement this interface when the default time-based bucketing does not match your partitioning requirements. Common custom implementations include Hive-style partitioning (e.g., country=US/date=2024-01-15) or hash-based partitioning.

Code Reference

Source Location

  • Repository: Apache Flink
  • File: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
  • Lines: L46-89

Signature

@PublicEvolving
public interface BucketAssigner<IN, BucketID> extends Serializable {

    BucketID getBucketId(IN element, BucketAssigner.Context context);

    SimpleVersionedSerializer<BucketID> getSerializer();

    @PublicEvolving
    interface Context {
        long currentProcessingTime();
        long currentWatermark();
        @Nullable Long timestamp();
    }
}

Import

import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;

I/O Contract

Inputs

Name Type Required Description
element IN Yes The incoming record to assign
context BucketAssigner.Context Yes Temporal context (processing time, watermark, timestamp)

Outputs

Name Type Description
bucketId BucketID (typically String) Subdirectory name under the base path

Usage Examples

Default Time-Based Bucketing

// Default: DateTimeBucketAssigner produces hourly directories
// Bucket IDs look like: "2024/01/15/14" (year/month/day/hour)
FileSink<String> sink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<>("UTF-8"))
    // DateTimeBucketAssigner is the default, no explicit configuration needed
    .build();

Custom Bucket Assigner

// Custom: partition by a field in the record
BucketAssigner<MyEvent, String> assigner = new BucketAssigner<MyEvent, String>() {
    @Override
    public String getBucketId(MyEvent element, Context context) {
        return "country=" + element.getCountry();
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
};

FileSink<MyEvent> sink = FileSink
    .forRowFormat(basePath, encoder)
    .withBucketAssigner(assigner)
    .build();

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment