Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi DataprocTask

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Google_Cloud
Last Updated 2026-02-10 08:00 GMT

Overview

The Dataproc contrib module provides Luigi task bindings for Google Cloud Dataproc, enabling submission and management of Spark and PySpark jobs on managed Dataproc clusters, as well as creation and deletion of clusters themselves.

Description

The module defines a hierarchy of task classes:

  • _DataprocBaseTask (extends luigi.Task): Internal base class holding shared parameters (gcloud_project_id, dataproc_cluster_name, dataproc_region) and the Dataproc API client.
  • DataprocBaseTask (extends _DataprocBaseTask): Public base class providing methods to submit Spark/PySpark jobs and poll for completion. Extend this when you need fine-grained control over job configuration.
  • DataprocSparkTask (extends DataprocBaseTask): Convenience task for running Spark jobs with parameters for main_class, jars, and job_args.
  • DataprocPysparkTask (extends DataprocBaseTask): Convenience task for running PySpark jobs with parameters for job_file, extra_files, and job_args.
  • CreateDataprocClusterTask (extends _DataprocBaseTask): Creates a Dataproc cluster with configurable machine types, disk sizes, worker counts, and image version. Polls until the cluster reaches RUNNING state.
  • DeleteDataprocClusterTask (extends _DataprocBaseTask): Deletes a Dataproc cluster and polls until it no longer exists.

The module uses the Google API Python client (googleapiclient) with default credentials via google.auth.

Usage

For running jobs, subclass DataprocSparkTask or DataprocPysparkTask and set the required parameters. For cluster lifecycle management, chain CreateDataprocClusterTask, your job task, and DeleteDataprocClusterTask as Luigi dependencies to create ephemeral clusters.

Code Reference

Source Location

luigi/contrib/dataproc.py (260 lines)

Signature

class _DataprocBaseTask(luigi.Task):
    gcloud_project_id = luigi.Parameter()
    dataproc_cluster_name = luigi.Parameter()
    dataproc_region = luigi.Parameter(default="global")

class DataprocBaseTask(_DataprocBaseTask):
    def submit_job(self, job_config):
        """Submits a raw job config dict to the Dataproc API."""

    def submit_spark_job(self, jars, main_class, job_args=None):
        """Builds and submits a Spark job configuration."""

    def submit_pyspark_job(self, job_file, extra_files=list(), job_args=None):
        """Builds and submits a PySpark job configuration."""

    def wait_for_job(self):
        """Polls the Dataproc API until the job reaches DONE or ERROR."""

class DataprocSparkTask(DataprocBaseTask):
    main_class = luigi.Parameter()
    jars = luigi.Parameter(default="")
    job_args = luigi.Parameter(default="")

class DataprocPysparkTask(DataprocBaseTask):
    job_file = luigi.Parameter()
    extra_files = luigi.Parameter(default="")
    job_args = luigi.Parameter(default="")

class CreateDataprocClusterTask(_DataprocBaseTask):
    gcloud_zone = luigi.Parameter(default="europe-west1-c")
    gcloud_network = luigi.Parameter(default="default")
    master_node_type = luigi.Parameter(default="n1-standard-2")
    master_disk_size = luigi.Parameter(default="100")
    worker_node_type = luigi.Parameter(default="n1-standard-2")
    worker_disk_size = luigi.Parameter(default="100")
    worker_normal_count = luigi.Parameter(default="2")
    worker_preemptible_count = luigi.Parameter(default="0")
    image_version = luigi.Parameter(default="")

class DeleteDataprocClusterTask(_DataprocBaseTask):
    pass

Import

from luigi.contrib.dataproc import DataprocSparkTask, DataprocPysparkTask
from luigi.contrib.dataproc import CreateDataprocClusterTask, DeleteDataprocClusterTask

I/O Contract

Inputs

Input Type Description
gcloud_project_id Parameter Google Cloud project ID for the Dataproc cluster and jobs.
dataproc_cluster_name Parameter Name of the Dataproc cluster to target.
main_class / job_file Parameter The Spark main class or PySpark main Python file URI.
jars / extra_files Parameter Comma-separated list of JAR URIs or additional Python file URIs.
job_args Parameter Comma-separated job arguments.

Outputs

Output Type Description
Job completion Side effect The Spark/PySpark job runs to completion on the Dataproc cluster. Raises an exception if the job ends in ERROR state.
Cluster state Side effect For CreateDataprocClusterTask, a running cluster. For DeleteDataprocClusterTask, the cluster is removed.

Usage Examples

from luigi.contrib.dataproc import DataprocSparkTask, CreateDataprocClusterTask

class MySparkJob(DataprocSparkTask):
    gcloud_project_id = 'my-gcp-project'
    dataproc_cluster_name = 'analytics-cluster'
    dataproc_region = 'us-central1'
    main_class = 'com.example.MySparkApp'
    jars = 'gs://my-bucket/jars/app.jar'
    job_args = '--input,gs://my-bucket/data,--output,gs://my-bucket/results'

    def requires(self):
        return CreateDataprocClusterTask(
            gcloud_project_id=self.gcloud_project_id,
            dataproc_cluster_name=self.dataproc_cluster_name,
            dataproc_region=self.dataproc_region,
            worker_normal_count='4'
        )

    def output(self):
        return luigi.contrib.gcs.GCSTarget('gs://my-bucket/results/_SUCCESS')

Related Pages

  • Spotify_Luigi_Distributed_Processing -- Principle governing distributed processing task abstractions
  • luigi.contrib.gcp -- Google Cloud Platform authentication utilities used by the Dataproc client

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment