Principle:Apache Flink File Source Framework
| Knowledge Sources | |
|---|---|
| Domains | File_Source, Connector_Framework |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Description
The File Source Framework defines the internal record position tracking architecture used by Flink's file source readers. At its core, the framework provides a unified way to represent where a reader is positioned within a file split, how that position is captured during checkpoints, and how batches of records are iterated over with full position awareness. This enables file-based sources to participate in Flink's exactly-once processing guarantees by precisely resuming from any checkpointed position.
The framework is composed of several cooperating classes residing in the org.apache.flink.connector.file.src.util package:
- CheckpointedPosition -- an immutable value object that captures the exact reader position (byte offset plus records-to-skip count) to be serialized into a checkpoint.
- MutableRecordAndPosition -- a mutable extension of
RecordAndPositionthat avoids object allocation in hot paths by updating fields in-place as successive records are emitted. - ArrayResultIterator -- a
BulkFormat.RecordIteratorbacked by a pre-populated array, designed for bulk readers that decode an entire batch of records at once. - IteratorResultIterator -- a
BulkFormat.RecordIteratorbacked by a standard JavaIterator, augmenting each element with checkpoint-compatible position information.
Theoretical Basis
The design follows the position-after-record contract: every emitted record carries a position that points to the location after the record, not before it. This means that if a checkpoint is taken immediately after processing a record, the stored position already reflects that the record has been consumed, and recovery will resume from the next record. This convention avoids the need for a separate "skip one record" step during recovery and supports formats where record skipping is impossible or undesirable (for example, when pushed-down filters might cause different records to be skipped after a filter update across a savepoint boundary).
The CheckpointedPosition captures two complementary coordinates:
| Field | Purpose |
|---|---|
offset |
A byte-level (or block-level) position the reader can seek to. May be NO_OFFSET (-1) when the format has no addressable positions.
|
recordsAfterOffset |
The number of records to skip after seeking to the offset in order to reach the current position. |
This two-part representation accommodates a wide spectrum of file formats:
- Fully addressable formats (e.g., Avro with sync markers) store a precise offset and zero records-to-skip.
- Sequential-only formats (e.g., CSV without block markers) store
NO_OFFSETand a cumulative record count. - Block-oriented formats (e.g., Parquet row groups) store the block start offset and the record count within the block.
The iterator classes (ArrayResultIterator and IteratorResultIterator) both use MutableRecordAndPosition internally and automatically increment the recordSkipCount with each call to next(), ensuring that position tracking is handled transparently without burdening format implementors.
Key Interactions
The typical data flow within the framework is:
- A
BulkFormat.Readerreads a batch of records from a file split. - It wraps the batch in an
ArrayResultIteratororIteratorResultIterator, providing the starting offset and skip count. - The file source operator calls
next()on the iterator, receiving records paired with their post-record positions. - On checkpoint, the most recently returned
RecordAndPositionis converted to aCheckpointedPositionand serialized. - On recovery, the reader seeks to the stored offset and skips the recorded number of records.
// CheckpointedPosition captures offset + records-to-skip for checkpoint serialization
public final class CheckpointedPosition implements Serializable {
public static final long NO_OFFSET = -1L;
private final long offset;
private final long recordsAfterOffset;
}
// MutableRecordAndPosition increments skip count automatically on setNext()
public class MutableRecordAndPosition<E> extends RecordAndPosition<E> {
public void setNext(E record) {
this.record = record;
this.recordSkipCount++;
}
}