Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Apache Flink Enriched Row Composition

From Leeroopedia


Knowledge Sources
Domains Enriched_Row, Data_Types, Connector_Architecture
Last Updated 2026-02-09 00:00 GMT

Overview

Description

The Connector Architecture principle describes the cross-cutting design patterns used by Flink's connector implementations to bridge the gap between the physical data stored in external systems and the logical row types expected by the Table/SQL layer. A central pattern in this architecture is the enriched row type -- a composite RowData that merges a fixed (static) row with a mutable (dynamic) row through an index mapping, enabling connectors to efficiently combine metadata columns (such as partition values) with data columns read from files.

The primary implementation of this pattern is the EnrichedRowData class, which resides in the file connector's table integration layer.

Theoretical Basis

The Enriched Row Composition Pattern

File-based table sources frequently need to produce rows that contain more columns than what is physically stored in the data files. For example, in a Hive-style partitioned table, the partition columns (year, month, day) are encoded in the directory path, not in the data files themselves. The table source must combine:

  • Mutable row -- the data columns read from each file record, which change with every row.
  • Fixed row -- the partition columns (or other metadata) that remain constant for all records within a single file split.

EnrichedRowData solves this by storing both rows and an index mapping array that defines how logical column positions map to physical positions in either the mutable or fixed row.

Index Mapping Convention

The index mapping uses a sign-based encoding to distinguish between the two source rows:

Mapping Value Interpretation
>= 0 Refers to a column at that index in the mutable row.
< 0 Refers to a column at index -(value + 1) in the fixed row.

For example, given the mapping [0, 1, -1, -2, 2]:

Logical Index Source Physical Index
0 Mutable row 0
1 Mutable row 1
2 Fixed row 0
3 Fixed row 1
4 Mutable row 2

This convention avoids the need for a separate "source indicator" array and encodes all routing information in a single compact int[].

// Index mapping: positive -> mutable row, negative (with -1 offset) -> fixed row
@Override
public boolean isNullAt(int pos) {
    int index = indexMapping[pos];
    if (index >= 0) {
        return mutableRow.isNullAt(index);
    } else {
        return fixedRow.isNullAt(-(index + 1));
    }
}

In-Place Row Replacement for Performance

A critical design decision is the replaceMutableRow() method, which swaps the mutable row in place without creating a new EnrichedRowData object:

public EnrichedRowData replaceMutableRow(RowData mutableRow) {
    this.mutableRow = mutableRow;
    return this;
}

This is an intentional trade-off: the EnrichedRowData instance is not thread-safe and not immutable, but in the hot path of record emission, avoiding object allocation is essential. The RowKind (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) is always inherited from the mutable row, since the fixed row represents static metadata that does not carry change semantics.

Static Factory for Field-Name-Based Construction

The EnrichedRowData.from() static factory method computes the index mapping from three ordered lists of field names:

  1. producedRowFields -- the full list of logical column names in the output row.
  2. mutableRowFields -- the column names present in the data file (mutable row).
  3. fixedRowFields -- the column names provided by static metadata (fixed row).

For each field in the produced row, the method first looks for a match in the mutable row fields. If not found, it falls back to the fixed row fields with the negative-offset encoding. This enables connectors to construct the mapping declaratively from schema metadata rather than manually computing index arrays.

public static int[] computeIndexMapping(
        List<String> producedRowFields,
        List<String> mutableRowFields,
        List<String> fixedRowFields) {
    int[] indexMapping = new int[producedRowFields.size()];
    for (int i = 0; i < producedRowFields.size(); i++) {
        String fieldName = producedRowFields.get(i);
        int newIndex = mutableRowFields.indexOf(fieldName);
        if (newIndex < 0) {
            newIndex = -(fixedRowFields.indexOf(fieldName) + 1);
        }
        indexMapping[i] = newIndex;
    }
    return indexMapping;
}

Broader Applicability

While EnrichedRowData was designed for the file connector's partition column injection use case, the pattern is applicable to any connector that needs to combine static metadata with dynamic record data. Examples include:

  • CDC connectors that augment data rows with source metadata (database name, table name, transaction ID).
  • Lookup join enrichment where a static dimension row is merged with a streaming fact row.
  • System column injection where computed or virtual columns (e.g., file name, split ID) are appended to data rows.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment