Implementation:Apache Flink MutableRecordAndPosition
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Source_Framework |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A mutable extension of RecordAndPosition that supports in-place updates for record and position fields, enabling object reuse in performance-sensitive iteration paths.
Description
MutableRecordAndPosition is a public-evolving class that extends the base RecordAndPosition class to add mutability. While RecordAndPosition is designed as an immutable value holder (with package-private fields), this subclass exposes three mutation methods that allow rewriting the record, offset, and skip count without allocating new objects.
The class provides three update methods:
- set(E record, long offset, long recordSkipCount): Fully updates the record and both position components.
- setPosition(long offset, long recordSkipCount): Updates only the position fields, leaving the record unchanged.
- setNext(E record): Sets a new record and increments the recordSkipCount by one. This is the primary method used during iteration, following the convention that each emitted record's position points to the position AFTER itself.
Key design decisions:
- Object reuse: This class is specifically designed for scenarios where a single RecordAndPosition instance is reused across multiple records, such as in ArrayResultIterator and IteratorResultIterator. This avoids per-record object allocation overhead.
- Package-private field access: The mutation methods directly write the package-private fields inherited from RecordAndPosition (record, offset, recordSkipCount), which is why the classes must be in the same package.
- Skip count auto-increment: The setNext() method automatically increments the skip count, encoding the "position after the record" semantics into the API.
Usage
Use MutableRecordAndPosition in BulkFormat.RecordIterator implementations to efficiently track the current record and its checkpoint position without per-record object allocation. It is a core utility used by ArrayResultIterator and IteratorResultIterator.
Code Reference
Source Location
- Repository: Apache_Flink
- File:
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java - Lines: 1-51
Signature
@PublicEvolving
public class MutableRecordAndPosition<E> extends RecordAndPosition<E>
Import
import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| record | E | No | The record to store; can be set via set() or setNext() |
| offset | long | No | The checkpoint offset; set via set() or setPosition() |
| recordSkipCount | long | No | The record skip count; set via set() or setPosition(), or auto-incremented via setNext() |
Outputs
| Name | Type | Description |
|---|---|---|
| getRecord() | E | The currently stored record (inherited from RecordAndPosition) |
| getOffset() | long | The current offset value (inherited from RecordAndPosition) |
| getRecordSkipCount() | long | The current record skip count (inherited from RecordAndPosition) |
Usage Examples
// Create a mutable record-and-position tracker
MutableRecordAndPosition<String> recordAndPos = new MutableRecordAndPosition<>();
// Initialize position for a new batch at offset 0 with skip count 0
recordAndPos.set(null, 0L, 0L);
// Emit sequential records (skip count auto-increments)
recordAndPos.setNext("first"); // skipCount becomes 1
recordAndPos.setNext("second"); // skipCount becomes 2
recordAndPos.setNext("third"); // skipCount becomes 3
String record = recordAndPos.getRecord(); // "third"
long skipCount = recordAndPos.getRecordSkipCount(); // 3
// Reset position for a new batch
recordAndPos.setPosition(4096L, 0L);
recordAndPos.setNext("newBatchFirst"); // skipCount becomes 1