Implementation:Apache Hudi HoodieTableSource GetScanRuntimeProvider
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for producing an executable Flink source operator graph from a Hudi table scan request, provided by Apache Hudi.
Description
HoodieTableSource.getScanRuntimeProvider is the central entry point that translates a Flink SQL or Table API read into a physical DataStream execution plan. The method returns a DataStreamScanProviderAdapter (implementing ScanRuntimeProvider) whose produceDataStream method constructs the actual operator topology when invoked by the Flink planner.
The method handles two distinct source implementations:
- FLIP-27 Source V2 (when
READ_SOURCE_V2_ENABLEDis true): Creates aHoodieSourceand registers it viaStreamExecutionEnvironment.fromSource(). This path uses the modern Flink Source interface with split enumerators and split readers. - Legacy Source V1 (default): For streaming reads, constructs a two-operator pipeline: a
StreamReadMonitoringFunction(parallelism 1) that periodically discovers new splits, followed by aStreamReadOperatorthat reads each split in parallel. For batch reads, uses anInputFormatSourceFunctionAdapterwrapping a Hudi input format.
The isBounded() method on the returned provider delegates to FlinkOptions.READ_AS_STREAMING to inform the planner whether the source is finite or infinite.
HoodieTableSource also implements SupportsProjectionPushDown, SupportsFilterPushDown, and SupportsLimitPushDown, so by the time getScanRuntimeProvider is called, the requiredPos, predicates, and limit fields have already been populated by the planner.
The companion class HoodieScanContext aggregates read mode metadata (start/end instants, streaming flag, partition pruner, skip flags) and is passed to FLIP-27 source components to guide split enumeration and reading.
Usage
This method is invoked automatically by the Flink table planner when a query reads from a Hudi table. Users do not call it directly. To influence its behavior, set connector options in the table DDL or query hints:
read.streaming.enabled = truefor streaming readsread.start-commit/read.end-commitfor incremental readsread.source.v2.enabled = truefor the FLIP-27 source
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java - Lines: 214-236
- Also:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java(Lines 40-68)
Signature
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return new DataStreamScanProviderAdapter() {
@Override
public boolean isBounded() {
return !conf.get(FlinkOptions.READ_AS_STREAMING);
}
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
// ...
if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
return produceNewSourceDataStream(execEnv);
} else {
return produceLegacySourceDataStream(execEnv, typeInfo);
}
}
};
}
Import
import org.apache.hudi.table.HoodieTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| scanContext | ScanTableSource.ScanContext |
Yes | Context from the Flink planner carrying projection and filter pushdown information. |
| conf | org.apache.flink.configuration.Configuration |
Yes (instance field) | Hudi connector configuration including READ_AS_STREAMING, READ_SOURCE_V2_ENABLED, READ_START_COMMIT, READ_END_COMMIT, READ_TASKS.
|
| schema | SerializableSchema |
Yes (instance field) | Full table schema used for type information and input format construction. |
| path | StoragePath |
Yes (instance field) | Base path of the Hudi table on storage. |
| partitionKeys | List<String> |
Yes (instance field) | Partition column names for the table. |
| predicates | List<Predicate> |
No (instance field) | Filter predicates pushed down by the planner. |
| requiredPos | int[] |
No (instance field) | Column indices required by the query (projection pushdown). |
Outputs
| Name | Type | Description |
|---|---|---|
| return value | ScanRuntimeProvider |
A DataStreamScanProviderAdapter that produces a DataStream<RowData> when produceDataStream() is called. The isBounded() method returns true for batch reads and false for streaming reads.
|
Usage Examples
// This method is called by the Flink planner, not directly by user code.
// The following illustrates how the table source is configured via DDL:
// CREATE TABLE hudi_table (
// id BIGINT,
// name STRING,
// ts TIMESTAMP(3),
// dt STRING
// ) PARTITIONED BY (dt)
// WITH (
// 'connector' = 'hudi',
// 'path' = 's3://bucket/hudi_table',
// 'read.streaming.enabled' = 'false',
// 'read.start-commit' = '20240101120000000',
// 'read.end-commit' = '20240102120000000'
// );
// SELECT * FROM hudi_table WHERE dt = '2024-01-01';
// The planner calls getScanRuntimeProvider(scanContext) with filter pushdown for dt.
// Programmatic access for testing:
HoodieTableSource source = new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf);
ScanRuntimeProvider provider = source.getScanRuntimeProvider(scanContext);
// provider.isBounded() == true (batch incremental read)