Implementation:Heibaiying BigData Notes Flink ListState Usage
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Big_Data |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tools for managing stateful computations using ListState provided by the Apache Flink State API, demonstrated through a threshold warning pattern in both keyed and operator state variants.
Description
The ThresholdWarning classes in the BigData-Notes repository demonstrate two approaches to stateful stream processing using ListState:
Keyed State variant (com.heibaiying.keyedstate.ThresholdWarning): Extends RichFlatMapFunction and uses getRuntimeContext().getListState() to obtain a per-key ListState. For each incoming sensor reading that exceeds a configured threshold, the value is appended to the list. When the number of threshold violations reaches a configurable limit, an alert tuple containing the key and all offending values is emitted, and the state is cleared.
Operator State variant (com.heibaiying.operatorstate.ThresholdWarning): Also extends RichFlatMapFunction but additionally implements CheckpointedFunction to manage operator-level state. It maintains a local ArrayList for in-memory processing and synchronizes it with a ListState during checkpoint snapshot and restore operations.
Both variants illustrate the pattern of accumulating state across events and emitting results when a condition is met, which is fundamental to alerting, anomaly detection, and batch-within-stream processing.
Usage
Use ListState when your application needs to maintain an ordered collection of values per key (keyed state) or per operator instance (operator state). Common scenarios include:
- Threshold-based alerting: Accumulate values that exceed a threshold and trigger an alert when the count reaches a limit.
- Buffering: Collect elements until a batch size or time condition is met before emitting.
- Pattern matching: Store recent events to detect sequences or patterns.
Code Reference
Source Location
- Repository: BigData-Notes
- File (keyed state):
code/Flink/flink-state-management/src/main/java/com/heibaiying/keyedstate/ThresholdWarning.java(lines 14-49) - File (operator state):
code/Flink/flink-state-management/src/main/java/com/heibaiying/operatorstate/ThresholdWarning.java(lines 17-72)
Signature
// Keyed State: obtain ListState via RuntimeContext
ListState<T> getRuntimeContext().getListState(ListStateDescriptor<T> descriptor)
// Operator State: obtain ListState via FunctionInitializationContext
ListState<T> context.getOperatorStateStore().getListState(ListStateDescriptor<T> descriptor)
// ListState methods
void add(T value)
Iterable<T> get()
void clear()
void update(List<T> values)
Import
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.flatmap.RichFlatMapFunction;
import org.apache.flink.util.Collector;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| descriptor | ListStateDescriptor<T> | Yes | Describes the state name and type. Used to register and retrieve the ListState from the runtime context or operator state store. |
| value | T | Yes (for add) | An individual element to append to the state list. |
| threshold | double | Yes (constructor) | The value threshold above which incoming readings are considered violations. |
| numberOfTimes | int | Yes (constructor) | The number of threshold violations required before emitting an alert. |
Outputs
| Name | Type | Description |
|---|---|---|
| alert | Tuple2<String, List<T>> | A tuple containing the key (sensor ID) and the list of values that exceeded the threshold. Emitted via the Collector when the violation count reaches the configured limit. |
| state | ListState<T> | The internal ListState is persisted through Flink checkpoints for fault-tolerant recovery. |
Usage Examples
Basic Usage
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.flatmap.RichFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
* Keyed state example: emits an alert when threshold violations
* for a given key exceed the configured limit.
*/
public class ThresholdWarning
extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {
private transient ListState<Long> abnormalData;
private Long threshold;
private Integer numberOfTimes;
public ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
}
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>(
"abnormalData",
TypeInformation.of(Long.class)
);
abnormalData = getRuntimeContext().getListState(descriptor);
}
@Override
public void flatMap(Tuple2<String, Long> value,
Collector<Tuple2<String, List<Long>>> out) throws Exception {
Long inputValue = value.f1;
if (inputValue >= threshold) {
abnormalData.add(inputValue);
}
List<Long> allData = new ArrayList<>();
for (Long d : abnormalData.get()) {
allData.add(d);
}
if (allData.size() >= numberOfTimes) {
out.collect(new Tuple2<>(value.f0 + " alert", allData));
abnormalData.clear();
}
}
}