Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink RecordsBySplits

From Leeroopedia


Knowledge Sources
Domains Connectors, Source_Framework
Last Updated 2026-02-09 00:00 GMT

Overview

A concrete implementation of RecordsWithSplitIds that organizes fetched records into groups keyed by their source split identifiers.

Description

RecordsBySplits is the standard implementation of the RecordsWithSplitIds interface in Flink's source connector framework. It stores records grouped by split ID in a LinkedHashMap to preserve insertion order, and maintains a set of finished split IDs. The class provides an iterator-based traversal pattern: callers first advance to the next split via nextSplit(), then iterate through records within that split via nextRecordFromSplit(). Once all records from all splits have been consumed, finishedSplits() returns the set of splits that have completed.

The class includes an inner Builder class that provides a convenient way to collect records incrementally. The builder supports adding individual records, bulk-adding collections of records, and marking splits as finished. The builder produces an immutable RecordsBySplits instance via its build() method, using empty collections when no data is present to avoid unnecessary memory allocation.

Usage

Connector developers use RecordsBySplits (typically via its Builder) inside their SplitReader implementation when returning fetched records from the fetch() method. The builder pattern allows the split reader to add records as they are read from the external system, then build the final result to hand off to the source reader framework. This is the most commonly used implementation of RecordsWithSplitIds for connectors that read from multiple splits and need to organize results by split.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
  • Lines: 1-166

Signature

@PublicEvolving
public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {

    // Constructor
    public RecordsBySplits(
            final Map<String, Collection<E>> recordsBySplit,
            final Set<String> finishedSplits);

    @Nullable
    @Override
    public String nextSplit();

    @Nullable
    @Override
    public E nextRecordFromSplit();

    @Override
    public Set<String> finishedSplits();

    // Inner Builder class
    public static class Builder<E> {

        public void add(String splitId, E record);

        public void add(SourceSplit split, E record);

        public void addAll(String splitId, Collection<E> records);

        public void addAll(SourceSplit split, Collection<E> records);

        public void addFinishedSplit(String splitId);

        public void addFinishedSplits(Collection<String> splitIds);

        public RecordsBySplits<E> build();
    }
}

Import

import org.apache.flink.connector.base.source.reader.RecordsBySplits;

I/O Contract

Inputs

Name Type Required Description
recordsBySplit Map<String, Collection<E>> Yes A map of split IDs to their associated record collections. Used in the direct constructor.
finishedSplits Set<String> Yes A set of split IDs that have been fully consumed. Used in the direct constructor.

Outputs

Name Type Description
nextSplit() String (nullable) Returns the next split ID containing records, or null if no more splits are available.
nextRecordFromSplit() E (nullable) Returns the next record from the current split, or null if no more records remain in the current split.
finishedSplits() Set<String> Returns the set of split IDs that have been marked as finished.

Usage Examples

// Example: Using RecordsBySplits.Builder inside a SplitReader.fetch() implementation
@Override
public RecordsWithSplitIds<byte[]> fetch() throws IOException {
    RecordsBySplits.Builder<byte[]> builder = new RecordsBySplits.Builder<>();

    for (MySplit split : assignedSplits) {
        List<byte[]> records = readFromExternalSystem(split);
        for (byte[] record : records) {
            builder.add(split.splitId(), record);
        }

        if (split.isFullyConsumed()) {
            builder.addFinishedSplit(split.splitId());
        }
    }

    return builder.build();
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment