Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieRecordEmitter EmitRecord

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for emitting Hudi records to downstream Flink operators with split position tracking, provided by Apache Hudi.

Description

HoodieRecordEmitter is the default implementation of Flink's RecordEmitter interface for the Hudi FLIP-27 source. It bridges the gap between the split reader's output (records with position metadata) and the Flink source framework's output channel.

The emitRecord method performs two operations in sequence:

  1. Record collection: Calls output.collect(record.record()) to emit the actual data record (e.g., a RowData) to the downstream Flink operator chain.
  2. Position update: Calls split.updatePosition(record.fileOffset(), record.recordOffset()) to advance the split's tracked position. This position is included in the next checkpoint, enabling the source to resume from exactly this point on recovery.

The class is generic over T (the record type, typically RowData), making it reusable across different table formats and read modes. It receives HoodieRecordWithPosition<T> objects from the split reader, which wrap the actual record with file offset (which file within the file group) and record offset (which record within that file) metadata.

The companion class StreamReadMonitoringFunction implements result delivery for the legacy (V1) source path. It runs as a single-parallelism source function that periodically monitors the Hudi timeline for new commit instants via IncrementalInputSplits. When new splits are discovered, they are emitted to downstream StreamReadOperator instances via a SourceContext. The function manages checkpointed state for the last issued instant and offset, and supports a configurable splits-per-cycle limit to control memory usage.

Usage

HoodieRecordEmitter is used internally by the Hudi FLIP-27 source. It is instantiated by the source reader and invoked for each record produced by the split reader function. Users do not interact with it directly.

StreamReadMonitoringFunction is used in the legacy V1 source path when read.streaming.enabled = true and read.source.v2.enabled = false. It is configured through:

  • read.streaming.check-interval -- monitoring interval in seconds
  • read.splits.limit -- maximum number of splits emitted per monitoring cycle

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java
  • Lines: 29-36
  • Also: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java (Lines 125-280)

Signature

public class HoodieRecordEmitter<T>
    implements RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {

    @Override
    public void emitRecord(
            HoodieRecordWithPosition<T> record,
            SourceOutput<T> output,
            HoodieSourceSplit split) throws Exception {
        output.collect(record.record());
        split.updatePosition(record.fileOffset(), record.recordOffset());
    }
}

Import

import org.apache.hudi.source.reader.HoodieRecordEmitter;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;

I/O Contract

Inputs

Name Type Required Description
record HoodieRecordWithPosition<T> Yes The record to emit, wrapping the actual data record with file offset and record offset metadata. The record() method returns the payload (e.g., RowData); fileOffset() returns the index of the file within the file group; recordOffset() returns the record's position within that file.
output SourceOutput<T> Yes The Flink source output collector. Calling collect() emits the record to downstream operators.
split HoodieSourceSplit Yes The source split being read. Its updatePosition() method is called to track read progress for checkpointing.

Outputs

Name Type Description
Emitted record T (typically RowData) The data record collected into the Flink output channel, available to downstream operators (filters, aggregations, sinks).
Updated split position void (side effect on HoodieSourceSplit) The split's internal position is advanced to reflect the emitted record. This position is captured during the next checkpoint for fault-tolerant recovery.

Usage Examples

// HoodieRecordEmitter is created and used by the Hudi source reader internally.
// The following illustrates the emission lifecycle:

import org.apache.hudi.source.reader.HoodieRecordEmitter;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.table.data.RowData;

// Create the emitter (once per source reader)
HoodieRecordEmitter<RowData> emitter = new HoodieRecordEmitter<>();

// For each record from the split reader:
HoodieRecordWithPosition<RowData> record = ...; // from HoodieSplitReaderFunction.read()
SourceOutput<RowData> output = ...;              // provided by Flink framework
HoodieSourceSplit split = ...;                   // the current split being read

// Emit the record and update position
emitter.emitRecord(record, output, split);
// record.record() is now available to downstream Flink operators
// split position is updated to (record.fileOffset(), record.recordOffset())

// Legacy V1 streaming path (StreamReadMonitoringFunction):
// StreamReadMonitoringFunction monitorFunc = new StreamReadMonitoringFunction(
//     conf, path, rowType, maxCompactionMemoryInBytes, partitionPruner);
// execEnv.addSource(monitorFunc).setParallelism(1);
// Periodically discovers new splits and emits them via context.collect(split)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment