Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Environment:Apache Beam Dataflow Streaming Runtime

From Leeroopedia


Knowledge Sources
Domains Infrastructure, Streaming, Google_Cloud
Last Updated 2026-02-09 04:00 GMT

Overview

Google Cloud Dataflow streaming worker runtime requiring JVM 11+, GCP credentials, Windmill backend connectivity, and memory monitoring with GC thrashing protection.

Description

This environment defines the runtime context for the StreamingDataflowWorker, which processes streaming pipelines on Google Cloud Dataflow. The worker connects to the Windmill backend service via bidirectional gRPC streams, receives work items, processes them through user-defined DoFns, and commits results back to distributed state storage. It includes an integrated MemoryMonitor that detects GC thrashing and can trigger JVM shutdown to prevent cascading failures. The worker harness collects Java vendor, version, and OS information for reporting to the Dataflow service.

Usage

Use this environment when deploying or debugging streaming Dataflow pipelines. It is the mandatory runtime for the `StreamingDataflowWorker`, `FanOutStreamingEngineWorkerHarness`, and all Windmill-backed state and timer implementations.

System Requirements

Category Requirement Notes
Runtime JVM 11+ (Eclipse Temurin) Docker image uses `eclipse-temurin:11`
OS Linux Docker containers run on GCE VMs
Network Connectivity to Windmill backend gRPC streams for GetWork, GetData, CommitWork
Network Connectivity to Dataflow service API For config fetching and status reporting
Memory Sufficient for worker cache + processing Configurable via `workerCacheMb` option

Dependencies

System Packages

  • `eclipse-temurin` JDK 11 (base Docker image)
  • `libltdl7` (required by container image)
  • `gRPC` libraries (bundled in worker JAR)
  • `protobuf` libraries (bundled in worker JAR)

Java Packages

  • `beam-runners-google-cloud-dataflow-java-worker` (main worker artifact)
  • `google-cloud-dataflow-java` (Dataflow service client)
  • Guava (vendored as `beam-vendor-guava`)
  • gRPC (for Windmill communication)

Credentials

The following credentials and configuration must be available at runtime:

  • GCP Service Account: Worker VM identity for Dataflow API access
  • `CLOUDSDK_COMPUTE_REGION`: Default GCP region (read by `DefaultGcpRegionFactory`)
  • Windmill connection properties:
    • `windmill.hostport`: Windmill backend host and port
    • `windmill.harness_update_reporting_period`: Status reporting interval (default: `PT10S`)
    • `windmill.global_get_config_refresh_period`: Config refresh interval (default: `PT120S`)
    • `windmill.periodic_status_page_directory`: Directory for status page output

Quick Install

# Dataflow streaming workers are deployed automatically by the Dataflow service.
# For local development/testing of the worker:
cd runners/google-cloud-dataflow-java/worker
../../../gradlew :runners:google-cloud-dataflow-java:worker:build

# Docker image for job servers:
# Base: eclipse-temurin:11
# Additional: apt-get install -y libltdl7

Code Evidence

System properties collection from `DataflowRunnerInfo.java:74-78`:

// Properties collected for reporting:
"java.vendor"
"java.version"
"os.arch"
"os.name"
"os.version"

GCP region from environment in `DefaultGcpRegionFactory.java:68-69`:

public static String getRegionFromEnvironment() {
    return System.getenv("CLOUDSDK_COMPUTE_REGION");
}

Memory monitor initialization from `StreamingDataflowWorker.java:614`:

MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);

GC thrashing detection from `MemoryMonitor.java:101-130`:

public static final long DEFAULT_SLEEP_TIME_MILLIS = 15 * 1000; // 15 sec.
private static final int NUM_MONITORED_PERIODS = 4; // ie 1 min's worth.
private static final double GC_THRASHING_PERCENTAGE_PER_SERVER = 60.0;
private static final int HEAP_DUMP_RESERVED_BYTES = 10 << 20; // 10MB
private static final int DEFAULT_SHUT_DOWN_AFTER_NUM_GCTHRASHING = 8; // ie 2 min's worth.

Common Errors

Error Message Cause Solution
GC thrashing detected, JVM shutdown Worker exceeds memory limits causing excessive garbage collection Increase worker memory or reduce `workerCacheMb`; check for memory leaks in user DoFns
Windmill connection timeout Network connectivity to Windmill backend lost Verify network configuration; check firewall rules; ensure Streaming Engine is enabled
`CLOUDSDK_COMPUTE_REGION` not set No default GCP region configured Set `CLOUDSDK_COMPUTE_REGION` env var or configure via `gcloud config set compute/region`
`Unable to get gcloud compute region` gcloud CLI not installed or not configured Install gcloud CLI and run `gcloud init`

Compatibility Notes

  • Streaming Engine mode: Uses `FanOutStreamingEngineWorkerHarness` with separate Windmill endpoints per computation. Non-Streaming-Engine (appliance) mode uses a single Windmill connection.
  • Docker base image: `eclipse-temurin:11` is used for both Flink and Spark job server containers; the Dataflow worker uses the same JVM baseline.
  • Memory monitoring: The `MemoryMonitor` pre-allocates a 10MB reserve buffer that is released during GC thrashing to allow heap dumps to succeed.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment