Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Apache Flink File Source Framework

From Leeroopedia


Knowledge Sources
Domains File_Source, Connector_Framework
Last Updated 2026-02-09 00:00 GMT

Overview

Description

The File Source Framework defines the internal record position tracking architecture used by Flink's file source readers. At its core, the framework provides a unified way to represent where a reader is positioned within a file split, how that position is captured during checkpoints, and how batches of records are iterated over with full position awareness. This enables file-based sources to participate in Flink's exactly-once processing guarantees by precisely resuming from any checkpointed position.

The framework is composed of several cooperating classes residing in the org.apache.flink.connector.file.src.util package:

  • CheckpointedPosition -- an immutable value object that captures the exact reader position (byte offset plus records-to-skip count) to be serialized into a checkpoint.
  • MutableRecordAndPosition -- a mutable extension of RecordAndPosition that avoids object allocation in hot paths by updating fields in-place as successive records are emitted.
  • ArrayResultIterator -- a BulkFormat.RecordIterator backed by a pre-populated array, designed for bulk readers that decode an entire batch of records at once.
  • IteratorResultIterator -- a BulkFormat.RecordIterator backed by a standard Java Iterator, augmenting each element with checkpoint-compatible position information.

Theoretical Basis

The design follows the position-after-record contract: every emitted record carries a position that points to the location after the record, not before it. This means that if a checkpoint is taken immediately after processing a record, the stored position already reflects that the record has been consumed, and recovery will resume from the next record. This convention avoids the need for a separate "skip one record" step during recovery and supports formats where record skipping is impossible or undesirable (for example, when pushed-down filters might cause different records to be skipped after a filter update across a savepoint boundary).

The CheckpointedPosition captures two complementary coordinates:

Field Purpose
offset A byte-level (or block-level) position the reader can seek to. May be NO_OFFSET (-1) when the format has no addressable positions.
recordsAfterOffset The number of records to skip after seeking to the offset in order to reach the current position.

This two-part representation accommodates a wide spectrum of file formats:

  • Fully addressable formats (e.g., Avro with sync markers) store a precise offset and zero records-to-skip.
  • Sequential-only formats (e.g., CSV without block markers) store NO_OFFSET and a cumulative record count.
  • Block-oriented formats (e.g., Parquet row groups) store the block start offset and the record count within the block.

The iterator classes (ArrayResultIterator and IteratorResultIterator) both use MutableRecordAndPosition internally and automatically increment the recordSkipCount with each call to next(), ensuring that position tracking is handled transparently without burdening format implementors.

Key Interactions

The typical data flow within the framework is:

  1. A BulkFormat.Reader reads a batch of records from a file split.
  2. It wraps the batch in an ArrayResultIterator or IteratorResultIterator, providing the starting offset and skip count.
  3. The file source operator calls next() on the iterator, receiving records paired with their post-record positions.
  4. On checkpoint, the most recently returned RecordAndPosition is converted to a CheckpointedPosition and serialized.
  5. On recovery, the reader seeks to the stored offset and skips the recorded number of records.
// CheckpointedPosition captures offset + records-to-skip for checkpoint serialization
public final class CheckpointedPosition implements Serializable {
    public static final long NO_OFFSET = -1L;
    private final long offset;
    private final long recordsAfterOffset;
}

// MutableRecordAndPosition increments skip count automatically on setNext()
public class MutableRecordAndPosition<E> extends RecordAndPosition<E> {
    public void setNext(E record) {
        this.record = record;
        this.recordSkipCount++;
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment