Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Flink HybridSource Builder

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Source_Architecture
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for composing multiple sources into a sequential chain provided by the Apache Flink connector-base module.

Description

The HybridSource.builder factory creates a HybridSourceBuilder starting with the first source. Sources are added via addSource (simple addition) or addSource(SourceFactory, Boundedness) (with position handoff). The build() method produces the final HybridSource<T> whose boundedness is determined by the last source.

Usage

Use the builder to compose a file source followed by a Kafka source for seamless bounded-to-unbounded reading.

Code Reference

Source Location

  • Repository: Apache Flink
  • File: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
  • Lines: L92-269

Signature

@PublicEvolving
public class HybridSource<T>
        implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {

    public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, EnumT>
            builder(Source<T, ?, ?> firstSource);

    @PublicEvolving
    public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
            implements Serializable {

        public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
                HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source);

        public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
                HybridSourceBuilder<T, ToEnumT> addSource(
                        SourceFactory<T, NextSourceT, ? super EnumT> sourceFactory,
                        Boundedness boundedness);

        public HybridSource<T> build();
    }

    @PublicEvolving
    @FunctionalInterface
    public interface SourceFactory<T, SourceT extends Source<T, ?, ?>,
            FromEnumT extends SplitEnumerator> extends Serializable {
        SourceT create(SourceSwitchContext<FromEnumT> context);
    }

    @PublicEvolving
    public interface SourceSwitchContext<EnumT> {
        @Nullable EnumT getPreviousEnumerator();
    }
}

Import

import org.apache.flink.connector.base.source.hybrid.HybridSource;

I/O Contract

Inputs

Name Type Required Description
firstSource Source<T, ?, ?> Yes First source in the chain
additionalSources Source<T, ?, ?> or SourceFactory Yes (1+) Subsequent sources

Outputs

Name Type Description
hybridSource HybridSource<T> Composite source reading from chain sequentially

Usage Examples

File-to-Kafka Chain

FileSource<String> fileSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/historical/"))
    .processStaticFileSet()
    .build();

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setTopics("live-topic")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

HybridSource<String> hybridSource = HybridSource
    .<String>builder(fileSource)
    .addSource(kafkaSource)
    .build();

env.fromSource(hybridSource, WatermarkStrategy.noWatermarks(), "hybrid");

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment