Environment:Apache Beam Dataflow Streaming Runtime
| Knowledge Sources | |
|---|---|
| Domains | Infrastructure, Streaming, Google_Cloud |
| Last Updated | 2026-02-09 04:00 GMT |
Overview
Google Cloud Dataflow streaming worker runtime requiring JVM 11+, GCP credentials, Windmill backend connectivity, and memory monitoring with GC thrashing protection.
Description
This environment defines the runtime context for the StreamingDataflowWorker, which processes streaming pipelines on Google Cloud Dataflow. The worker connects to the Windmill backend service via bidirectional gRPC streams, receives work items, processes them through user-defined DoFns, and commits results back to distributed state storage. It includes an integrated MemoryMonitor that detects GC thrashing and can trigger JVM shutdown to prevent cascading failures. The worker harness collects Java vendor, version, and OS information for reporting to the Dataflow service.
Usage
Use this environment when deploying or debugging streaming Dataflow pipelines. It is the mandatory runtime for the `StreamingDataflowWorker`, `FanOutStreamingEngineWorkerHarness`, and all Windmill-backed state and timer implementations.
System Requirements
| Category | Requirement | Notes |
|---|---|---|
| Runtime | JVM 11+ (Eclipse Temurin) | Docker image uses `eclipse-temurin:11` |
| OS | Linux | Docker containers run on GCE VMs |
| Network | Connectivity to Windmill backend | gRPC streams for GetWork, GetData, CommitWork |
| Network | Connectivity to Dataflow service API | For config fetching and status reporting |
| Memory | Sufficient for worker cache + processing | Configurable via `workerCacheMb` option |
Dependencies
System Packages
- `eclipse-temurin` JDK 11 (base Docker image)
- `libltdl7` (required by container image)
- `gRPC` libraries (bundled in worker JAR)
- `protobuf` libraries (bundled in worker JAR)
Java Packages
- `beam-runners-google-cloud-dataflow-java-worker` (main worker artifact)
- `google-cloud-dataflow-java` (Dataflow service client)
- Guava (vendored as `beam-vendor-guava`)
- gRPC (for Windmill communication)
Credentials
The following credentials and configuration must be available at runtime:
- GCP Service Account: Worker VM identity for Dataflow API access
- `CLOUDSDK_COMPUTE_REGION`: Default GCP region (read by `DefaultGcpRegionFactory`)
- Windmill connection properties:
- `windmill.hostport`: Windmill backend host and port
- `windmill.harness_update_reporting_period`: Status reporting interval (default: `PT10S`)
- `windmill.global_get_config_refresh_period`: Config refresh interval (default: `PT120S`)
- `windmill.periodic_status_page_directory`: Directory for status page output
Quick Install
# Dataflow streaming workers are deployed automatically by the Dataflow service.
# For local development/testing of the worker:
cd runners/google-cloud-dataflow-java/worker
../../../gradlew :runners:google-cloud-dataflow-java:worker:build
# Docker image for job servers:
# Base: eclipse-temurin:11
# Additional: apt-get install -y libltdl7
Code Evidence
System properties collection from `DataflowRunnerInfo.java:74-78`:
// Properties collected for reporting:
"java.vendor"
"java.version"
"os.arch"
"os.name"
"os.version"
GCP region from environment in `DefaultGcpRegionFactory.java:68-69`:
public static String getRegionFromEnvironment() {
return System.getenv("CLOUDSDK_COMPUTE_REGION");
}
Memory monitor initialization from `StreamingDataflowWorker.java:614`:
MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
GC thrashing detection from `MemoryMonitor.java:101-130`:
public static final long DEFAULT_SLEEP_TIME_MILLIS = 15 * 1000; // 15 sec.
private static final int NUM_MONITORED_PERIODS = 4; // ie 1 min's worth.
private static final double GC_THRASHING_PERCENTAGE_PER_SERVER = 60.0;
private static final int HEAP_DUMP_RESERVED_BYTES = 10 << 20; // 10MB
private static final int DEFAULT_SHUT_DOWN_AFTER_NUM_GCTHRASHING = 8; // ie 2 min's worth.
Common Errors
| Error Message | Cause | Solution |
|---|---|---|
| GC thrashing detected, JVM shutdown | Worker exceeds memory limits causing excessive garbage collection | Increase worker memory or reduce `workerCacheMb`; check for memory leaks in user DoFns |
| Windmill connection timeout | Network connectivity to Windmill backend lost | Verify network configuration; check firewall rules; ensure Streaming Engine is enabled |
| `CLOUDSDK_COMPUTE_REGION` not set | No default GCP region configured | Set `CLOUDSDK_COMPUTE_REGION` env var or configure via `gcloud config set compute/region` |
| `Unable to get gcloud compute region` | gcloud CLI not installed or not configured | Install gcloud CLI and run `gcloud init` |
Compatibility Notes
- Streaming Engine mode: Uses `FanOutStreamingEngineWorkerHarness` with separate Windmill endpoints per computation. Non-Streaming-Engine (appliance) mode uses a single Windmill connection.
- Docker base image: `eclipse-temurin:11` is used for both Flink and Spark job server containers; the Dataflow worker uses the same JVM baseline.
- Memory monitoring: The `MemoryMonitor` pre-allocates a 10MB reserve buffer that is released during GC thrashing to allow heap dumps to succeed.
Related Pages
- Implementation:Apache_Beam_StreamingDataflowWorker_FromOptions
- Implementation:Apache_Beam_FanOutStreamingEngineWorkerHarness
- Implementation:Apache_Beam_StreamingEngineComputationConfigFetcher
- Implementation:Apache_Beam_WindmillTimerInternals
- Implementation:Apache_Beam_WindmillStateInternals_Persist
- Implementation:Apache_Beam_StreamingDataflowWorker_Start