Implementation:Spotify Luigi BeamDataflowTask
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Google_Cloud |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Abstract Luigi task for running Apache Beam pipelines on Google Cloud Dataflow, providing automatic command-line construction, input/output formatting, and job lifecycle management.
Description
The beam_dataflow module provides a Luigi wrapper around Google Cloud Dataflow jobs built with the Apache Beam SDK. The central class is BeamDataflowJobTask, which is an abstract task (extending both MixinNaiveBulkComplete and luigi.Task) that must be subclassed for each specific Beam SDK (Java or Python).
Key components:
- BeamDataflowJobTask (abstract): The main task class that orchestrates Dataflow job execution. Subclasses must implement
dataflow_executable()to return the command for launching the Beam pipeline (e.g.,['java', 'com.example.MyPipeline']). The task automatically constructs command-line arguments from its properties, formats input/output arguments from Luigi'srequires()/output(), and manages the job lifecycle including before_run, on_successful_run, validate_output, and cleanup_on_error hooks.
- DataflowParamKeys (abstract): Defines the naming conventions for Dataflow execution parameters. This abstraction allows supporting both Java API (lower camel case) and Python API (snake case) parameter naming conventions.
- _CmdLineRunner: Default subprocess runner that executes the constructed command line, streams output to the logger, and raises
CalledProcessErroron non-zero exit codes.
The task supports extensive Dataflow configuration including project, runner (DirectRunner or DataflowRunner), zone, region, staging/temp locations, worker configuration (machine type, disk type, disk size), autoscaling, networking, service account, and custom labels.
Input formatting supports multiple patterns: single Target, dict of name-to-Target, tuple of (name, Target), list of Targets, and list of (name, Target) tuples. Output formatting supports single Target or dict of name-to-Target.
Usage
Use this module when you need to orchestrate Apache Beam / Google Cloud Dataflow jobs within a Luigi pipeline. Subclass BeamDataflowJobTask, implement dataflow_executable(), set the required Dataflow properties (project, temp_location), and define requires()/output() to wire the task into your DAG.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/beam_dataflow.py - Lines: 1-498
Signature
class DataflowParamKeys(metaclass=abc.ABCMeta):
# Abstract properties: runner, project, zone, region, staging_location,
# temp_location, gcp_temp_location, num_workers, autoscaling_algorithm,
# max_num_workers, disk_size_gb, worker_machine_type, worker_disk_type,
# job_name, service_account, network, subnetwork, labels
...
class BeamDataflowJobTask(MixinNaiveBulkComplete, luigi.Task, metaclass=abc.ABCMeta):
project = None
runner = None
temp_location = None
staging_location = None
gcp_temp_location = None
num_workers = None
autoscaling_algorithm = None
max_num_workers = None
network = None
subnetwork = None
disk_size_gb = None
worker_machine_type = None
job_name = None
worker_disk_type = None
service_account = None
zone = None
region = None
labels = {}
cmd_line_runner = _CmdLineRunner
dataflow_params = None
@abc.abstractmethod
def dataflow_executable(self):
...
def run(self):
...
Import
from luigi.contrib.beam_dataflow import BeamDataflowJobTask
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| project | str | Yes | GCP project ID for the Dataflow job |
| temp_location | str | Yes | Cloud Storage path for temporary files |
| runner | str | No | PipelineRunner: "DirectRunner" (default) or "DataflowRunner" |
| zone | str | No | GCE availability zone for worker instances |
| region | str | No | GCP region for Dataflow job deployment (default: us-central1) |
| staging_location | str | No | Cloud Storage path for staging binary files |
| gcp_temp_location | str | No | Cloud Storage path for GCP temporary files |
| num_workers | int | No | Number of workers to start the task with |
| autoscaling_algorithm | str | No | Autoscaling mode (default: THROUGHPUT_BASED) |
| max_num_workers | int | No | Maximum number of workers when autoscaling |
| network | str | No | GCE network for launching workers (default: "default") |
| subnetwork | str | No | GCE subnetwork for launching workers |
| disk_size_gb | int | No | Remote worker disk size in GB (minimum 30) |
| worker_machine_type | str | No | Machine type for Dataflow worker VMs |
| worker_disk_type | str | No | Disk type for workers (SSD or default hard disk) |
| job_name | str | No | Custom job name, must be unique across active jobs |
| service_account | str | No | Service account for Dataflow VMs |
| labels | dict | No | Custom GCP labels attached to the job |
| dataflow_params | DataflowParamKeys | Yes | Parameter naming convention for the Beam SDK being used |
Outputs
| Name | Type | Description |
|---|---|---|
| Job execution | Dataflow job | The Beam pipeline is executed on Google Cloud Dataflow or locally (DirectRunner) |
| Target outputs | luigi.Target | Output targets as defined by the subclass's output() method
|
Usage Examples
Basic Usage
from luigi.contrib.beam_dataflow import BeamDataflowJobTask, DataflowParamKeys
class JavaDataflowParams(DataflowParamKeys):
@property
def runner(self):
return 'runner'
@property
def project(self):
return 'project'
@property
def zone(self):
return 'zone'
@property
def region(self):
return 'region'
# ... implement all abstract properties with Java camelCase naming
class MyDataflowJob(BeamDataflowJobTask):
project = 'my-gcp-project'
runner = 'DataflowRunner'
temp_location = 'gs://my-bucket/temp'
staging_location = 'gs://my-bucket/staging'
dataflow_params = JavaDataflowParams()
def dataflow_executable(self):
return ['java', '-cp', 'my-pipeline.jar', 'com.example.MyPipeline']
def requires(self):
return SomeUpstreamTask()
def output(self):
from luigi.contrib.gcs import GCSTarget
return GCSTarget('gs://my-bucket/output/')
With Lifecycle Hooks
class MyValidatedJob(BeamDataflowJobTask):
# ... properties ...
def before_run(self):
"""Setup temporary tables before pipeline runs."""
pass
def on_successful_run(self):
"""Post-processing after successful pipeline execution."""
pass
def validate_output(self):
"""Validate output before publishing."""
return self.output().exists()
def cleanup_on_error(self, error):
"""Clean up partial output on failure."""
if self.output().exists():
self.output().fs.remove(self.output().path)