Implementation:Apache Flink IndexLookupGeneratorFunction
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Source_Framework |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A stream generator function that returns elements from a collection based on their index, using an in-memory HashMap lookup for O(1) random access.
Description
IndexLookupGeneratorFunction is an internal implementation of the GeneratorFunction interface that provides index-based random access to a collection of elements. Unlike FromElementsGeneratorFunction which sequentially deserializes elements and must seek forward on recovery, this implementation eagerly builds a HashMap<Long, OUT> lookup table during open(), enabling constant-time access to any element by index.
During construction, the provided elements are validated (no nulls, correct types) and serialized into a byte array using Flink's TypeSerializer. When open() is called at runtime, all elements are deserialized and stored in a lookupMap keyed by their sequential index (0, 1, 2, ...). The map() method then performs a simple HashMap lookup.
Key design decisions:
- The entire collection is deserialized into memory at open() time, trading memory for O(1) access time.
- This approach is well-suited for scenarios requiring random access patterns or where failure recovery needs to revisit arbitrary elements without sequential scanning.
- The element count is tracked during the checkIterable() validation pass rather than requiring a separate count parameter.
Usage
Use IndexLookupGeneratorFunction when you need a GeneratorFunction that supports efficient random-access element lookup by index. This is particularly useful when elements may be accessed out of order or when failure recovery needs to re-emit elements at arbitrary positions. It is marked @Internal and is not intended for direct use by application developers.
Code Reference
Source Location
- Repository: Apache_Flink
- File:
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/IndexLookupGeneratorFunction.java - Lines: 1-176
Signature
@Internal
public class IndexLookupGeneratorFunction<OUT> implements GeneratorFunction<Long, OUT>
Import
import org.apache.flink.connector.datagen.functions.IndexLookupGeneratorFunction;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| typeInfo | TypeInformation<OUT> | Yes | Type information used to create the serializer for elements |
| elements | Iterable<OUT> | Yes | The collection of elements to make available for lookup; must be non-null and type-compatible |
| config | ExecutionConfig | No | Execution configuration for serializer creation (defaults to new ExecutionConfig) |
| index (map input) | Long | Yes | The index of the element to retrieve from the lookup map |
Outputs
| Name | Type | Description |
|---|---|---|
| map result | OUT | The element at the requested index, or null if the index is not in the lookup map |
Usage Examples
// Create an index-lookup generator function from a list of integers
TypeInformation<Integer> typeInfo = Types.INT;
List<Integer> elements = Arrays.asList(10, 20, 30, 40, 50);
IndexLookupGeneratorFunction<Integer> generatorFunction =
new IndexLookupGeneratorFunction<>(typeInfo, elements);
// Use with DataGeneratorSource
DataGeneratorSource<Integer> source =
new DataGeneratorSource<>(generatorFunction, elements.size(), Types.INT);
DataStreamSource<Integer> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Index Lookup Source");