Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi BeamDataflowTask

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Google_Cloud
Last Updated 2026-02-10 08:00 GMT

Overview

Abstract Luigi task for running Apache Beam pipelines on Google Cloud Dataflow, providing automatic command-line construction, input/output formatting, and job lifecycle management.

Description

The beam_dataflow module provides a Luigi wrapper around Google Cloud Dataflow jobs built with the Apache Beam SDK. The central class is BeamDataflowJobTask, which is an abstract task (extending both MixinNaiveBulkComplete and luigi.Task) that must be subclassed for each specific Beam SDK (Java or Python).

Key components:

  • BeamDataflowJobTask (abstract): The main task class that orchestrates Dataflow job execution. Subclasses must implement dataflow_executable() to return the command for launching the Beam pipeline (e.g., ['java', 'com.example.MyPipeline']). The task automatically constructs command-line arguments from its properties, formats input/output arguments from Luigi's requires()/output(), and manages the job lifecycle including before_run, on_successful_run, validate_output, and cleanup_on_error hooks.
  • DataflowParamKeys (abstract): Defines the naming conventions for Dataflow execution parameters. This abstraction allows supporting both Java API (lower camel case) and Python API (snake case) parameter naming conventions.
  • _CmdLineRunner: Default subprocess runner that executes the constructed command line, streams output to the logger, and raises CalledProcessError on non-zero exit codes.

The task supports extensive Dataflow configuration including project, runner (DirectRunner or DataflowRunner), zone, region, staging/temp locations, worker configuration (machine type, disk type, disk size), autoscaling, networking, service account, and custom labels.

Input formatting supports multiple patterns: single Target, dict of name-to-Target, tuple of (name, Target), list of Targets, and list of (name, Target) tuples. Output formatting supports single Target or dict of name-to-Target.

Usage

Use this module when you need to orchestrate Apache Beam / Google Cloud Dataflow jobs within a Luigi pipeline. Subclass BeamDataflowJobTask, implement dataflow_executable(), set the required Dataflow properties (project, temp_location), and define requires()/output() to wire the task into your DAG.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/beam_dataflow.py
  • Lines: 1-498

Signature

class DataflowParamKeys(metaclass=abc.ABCMeta):
    # Abstract properties: runner, project, zone, region, staging_location,
    # temp_location, gcp_temp_location, num_workers, autoscaling_algorithm,
    # max_num_workers, disk_size_gb, worker_machine_type, worker_disk_type,
    # job_name, service_account, network, subnetwork, labels
    ...

class BeamDataflowJobTask(MixinNaiveBulkComplete, luigi.Task, metaclass=abc.ABCMeta):
    project = None
    runner = None
    temp_location = None
    staging_location = None
    gcp_temp_location = None
    num_workers = None
    autoscaling_algorithm = None
    max_num_workers = None
    network = None
    subnetwork = None
    disk_size_gb = None
    worker_machine_type = None
    job_name = None
    worker_disk_type = None
    service_account = None
    zone = None
    region = None
    labels = {}
    cmd_line_runner = _CmdLineRunner
    dataflow_params = None

    @abc.abstractmethod
    def dataflow_executable(self):
        ...

    def run(self):
        ...

Import

from luigi.contrib.beam_dataflow import BeamDataflowJobTask

I/O Contract

Inputs

Name Type Required Description
project str Yes GCP project ID for the Dataflow job
temp_location str Yes Cloud Storage path for temporary files
runner str No PipelineRunner: "DirectRunner" (default) or "DataflowRunner"
zone str No GCE availability zone for worker instances
region str No GCP region for Dataflow job deployment (default: us-central1)
staging_location str No Cloud Storage path for staging binary files
gcp_temp_location str No Cloud Storage path for GCP temporary files
num_workers int No Number of workers to start the task with
autoscaling_algorithm str No Autoscaling mode (default: THROUGHPUT_BASED)
max_num_workers int No Maximum number of workers when autoscaling
network str No GCE network for launching workers (default: "default")
subnetwork str No GCE subnetwork for launching workers
disk_size_gb int No Remote worker disk size in GB (minimum 30)
worker_machine_type str No Machine type for Dataflow worker VMs
worker_disk_type str No Disk type for workers (SSD or default hard disk)
job_name str No Custom job name, must be unique across active jobs
service_account str No Service account for Dataflow VMs
labels dict No Custom GCP labels attached to the job
dataflow_params DataflowParamKeys Yes Parameter naming convention for the Beam SDK being used

Outputs

Name Type Description
Job execution Dataflow job The Beam pipeline is executed on Google Cloud Dataflow or locally (DirectRunner)
Target outputs luigi.Target Output targets as defined by the subclass's output() method

Usage Examples

Basic Usage

from luigi.contrib.beam_dataflow import BeamDataflowJobTask, DataflowParamKeys

class JavaDataflowParams(DataflowParamKeys):
    @property
    def runner(self):
        return 'runner'

    @property
    def project(self):
        return 'project'

    @property
    def zone(self):
        return 'zone'

    @property
    def region(self):
        return 'region'

    # ... implement all abstract properties with Java camelCase naming

class MyDataflowJob(BeamDataflowJobTask):
    project = 'my-gcp-project'
    runner = 'DataflowRunner'
    temp_location = 'gs://my-bucket/temp'
    staging_location = 'gs://my-bucket/staging'
    dataflow_params = JavaDataflowParams()

    def dataflow_executable(self):
        return ['java', '-cp', 'my-pipeline.jar', 'com.example.MyPipeline']

    def requires(self):
        return SomeUpstreamTask()

    def output(self):
        from luigi.contrib.gcs import GCSTarget
        return GCSTarget('gs://my-bucket/output/')

With Lifecycle Hooks

class MyValidatedJob(BeamDataflowJobTask):
    # ... properties ...

    def before_run(self):
        """Setup temporary tables before pipeline runs."""
        pass

    def on_successful_run(self):
        """Post-processing after successful pipeline execution."""
        pass

    def validate_output(self):
        """Validate output before publishing."""
        return self.output().exists()

    def cleanup_on_error(self, error):
        """Clean up partial output on failure."""
        if self.output().exists():
            self.output().fs.remove(self.output().path)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment