Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi HoodieTableSource GetScanRuntimeProvider

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for producing an executable Flink source operator graph from a Hudi table scan request, provided by Apache Hudi.

Description

HoodieTableSource.getScanRuntimeProvider is the central entry point that translates a Flink SQL or Table API read into a physical DataStream execution plan. The method returns a DataStreamScanProviderAdapter (implementing ScanRuntimeProvider) whose produceDataStream method constructs the actual operator topology when invoked by the Flink planner.

The method handles two distinct source implementations:

  1. FLIP-27 Source V2 (when READ_SOURCE_V2_ENABLED is true): Creates a HoodieSource and registers it via StreamExecutionEnvironment.fromSource(). This path uses the modern Flink Source interface with split enumerators and split readers.
  2. Legacy Source V1 (default): For streaming reads, constructs a two-operator pipeline: a StreamReadMonitoringFunction (parallelism 1) that periodically discovers new splits, followed by a StreamReadOperator that reads each split in parallel. For batch reads, uses an InputFormatSourceFunctionAdapter wrapping a Hudi input format.

The isBounded() method on the returned provider delegates to FlinkOptions.READ_AS_STREAMING to inform the planner whether the source is finite or infinite.

HoodieTableSource also implements SupportsProjectionPushDown, SupportsFilterPushDown, and SupportsLimitPushDown, so by the time getScanRuntimeProvider is called, the requiredPos, predicates, and limit fields have already been populated by the planner.

The companion class HoodieScanContext aggregates read mode metadata (start/end instants, streaming flag, partition pruner, skip flags) and is passed to FLIP-27 source components to guide split enumeration and reading.

Usage

This method is invoked automatically by the Flink table planner when a query reads from a Hudi table. Users do not call it directly. To influence its behavior, set connector options in the table DDL or query hints:

  • read.streaming.enabled = true for streaming reads
  • read.start-commit / read.end-commit for incremental reads
  • read.source.v2.enabled = true for the FLIP-27 source

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
  • Lines: 214-236
  • Also: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java (Lines 40-68)

Signature

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    return new DataStreamScanProviderAdapter() {

        @Override
        public boolean isBounded() {
            return !conf.get(FlinkOptions.READ_AS_STREAMING);
        }

        @Override
        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
            // ...
            if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
                return produceNewSourceDataStream(execEnv);
            } else {
                return produceLegacySourceDataStream(execEnv, typeInfo);
            }
        }
    };
}

Import

import org.apache.hudi.table.HoodieTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;

I/O Contract

Inputs

Name Type Required Description
scanContext ScanTableSource.ScanContext Yes Context from the Flink planner carrying projection and filter pushdown information.
conf org.apache.flink.configuration.Configuration Yes (instance field) Hudi connector configuration including READ_AS_STREAMING, READ_SOURCE_V2_ENABLED, READ_START_COMMIT, READ_END_COMMIT, READ_TASKS.
schema SerializableSchema Yes (instance field) Full table schema used for type information and input format construction.
path StoragePath Yes (instance field) Base path of the Hudi table on storage.
partitionKeys List<String> Yes (instance field) Partition column names for the table.
predicates List<Predicate> No (instance field) Filter predicates pushed down by the planner.
requiredPos int[] No (instance field) Column indices required by the query (projection pushdown).

Outputs

Name Type Description
return value ScanRuntimeProvider A DataStreamScanProviderAdapter that produces a DataStream<RowData> when produceDataStream() is called. The isBounded() method returns true for batch reads and false for streaming reads.

Usage Examples

// This method is called by the Flink planner, not directly by user code.
// The following illustrates how the table source is configured via DDL:

// CREATE TABLE hudi_table (
//   id BIGINT,
//   name STRING,
//   ts TIMESTAMP(3),
//   dt STRING
// ) PARTITIONED BY (dt)
// WITH (
//   'connector' = 'hudi',
//   'path' = 's3://bucket/hudi_table',
//   'read.streaming.enabled' = 'false',
//   'read.start-commit' = '20240101120000000',
//   'read.end-commit' = '20240102120000000'
// );

// SELECT * FROM hudi_table WHERE dt = '2024-01-01';
// The planner calls getScanRuntimeProvider(scanContext) with filter pushdown for dt.

// Programmatic access for testing:
HoodieTableSource source = new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf);
ScanRuntimeProvider provider = source.getScanRuntimeProvider(scanContext);
// provider.isBounded() == true (batch incremental read)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment