Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes Flink ListState Usage

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Big_Data
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tools for managing stateful computations using ListState provided by the Apache Flink State API, demonstrated through a threshold warning pattern in both keyed and operator state variants.

Description

The ThresholdWarning classes in the BigData-Notes repository demonstrate two approaches to stateful stream processing using ListState:

Keyed State variant (com.heibaiying.keyedstate.ThresholdWarning): Extends RichFlatMapFunction and uses getRuntimeContext().getListState() to obtain a per-key ListState. For each incoming sensor reading that exceeds a configured threshold, the value is appended to the list. When the number of threshold violations reaches a configurable limit, an alert tuple containing the key and all offending values is emitted, and the state is cleared.

Operator State variant (com.heibaiying.operatorstate.ThresholdWarning): Also extends RichFlatMapFunction but additionally implements CheckpointedFunction to manage operator-level state. It maintains a local ArrayList for in-memory processing and synchronizes it with a ListState during checkpoint snapshot and restore operations.

Both variants illustrate the pattern of accumulating state across events and emitting results when a condition is met, which is fundamental to alerting, anomaly detection, and batch-within-stream processing.

Usage

Use ListState when your application needs to maintain an ordered collection of values per key (keyed state) or per operator instance (operator state). Common scenarios include:

  • Threshold-based alerting: Accumulate values that exceed a threshold and trigger an alert when the count reaches a limit.
  • Buffering: Collect elements until a batch size or time condition is met before emitting.
  • Pattern matching: Store recent events to detect sequences or patterns.

Code Reference

Source Location

  • Repository: BigData-Notes
  • File (keyed state): code/Flink/flink-state-management/src/main/java/com/heibaiying/keyedstate/ThresholdWarning.java (lines 14-49)
  • File (operator state): code/Flink/flink-state-management/src/main/java/com/heibaiying/operatorstate/ThresholdWarning.java (lines 17-72)

Signature

// Keyed State: obtain ListState via RuntimeContext
ListState<T> getRuntimeContext().getListState(ListStateDescriptor<T> descriptor)

// Operator State: obtain ListState via FunctionInitializationContext
ListState<T> context.getOperatorStateStore().getListState(ListStateDescriptor<T> descriptor)

// ListState methods
void add(T value)
Iterable<T> get()
void clear()
void update(List<T> values)

Import

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.flatmap.RichFlatMapFunction;
import org.apache.flink.util.Collector;

I/O Contract

Inputs

Name Type Required Description
descriptor ListStateDescriptor<T> Yes Describes the state name and type. Used to register and retrieve the ListState from the runtime context or operator state store.
value T Yes (for add) An individual element to append to the state list.
threshold double Yes (constructor) The value threshold above which incoming readings are considered violations.
numberOfTimes int Yes (constructor) The number of threshold violations required before emitting an alert.

Outputs

Name Type Description
alert Tuple2<String, List<T>> A tuple containing the key (sensor ID) and the list of values that exceeded the threshold. Emitted via the Collector when the violation count reaches the configured limit.
state ListState<T> The internal ListState is persisted through Flink checkpoints for fault-tolerant recovery.

Usage Examples

Basic Usage

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.flatmap.RichFlatMapFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

/**
 * Keyed state example: emits an alert when threshold violations
 * for a given key exceed the configured limit.
 */
public class ThresholdWarning
    extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {

    private transient ListState<Long> abnormalData;
    private Long threshold;
    private Integer numberOfTimes;

    public ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>(
            "abnormalData",
            TypeInformation.of(Long.class)
        );
        abnormalData = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<String, Long> value,
                        Collector<Tuple2<String, List<Long>>> out) throws Exception {
        Long inputValue = value.f1;
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }
        List<Long> allData = new ArrayList<>();
        for (Long d : abnormalData.get()) {
            allData.add(d);
        }
        if (allData.size() >= numberOfTimes) {
            out.collect(new Tuple2<>(value.f0 + " alert", allData));
            abnormalData.clear();
        }
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment