Implementation:Spotify Luigi DataprocTask
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Google_Cloud |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
The Dataproc contrib module provides Luigi task bindings for Google Cloud Dataproc, enabling submission and management of Spark and PySpark jobs on managed Dataproc clusters, as well as creation and deletion of clusters themselves.
Description
The module defines a hierarchy of task classes:
_DataprocBaseTask(extendsluigi.Task): Internal base class holding shared parameters (gcloud_project_id,dataproc_cluster_name,dataproc_region) and the Dataproc API client.DataprocBaseTask(extends_DataprocBaseTask): Public base class providing methods to submit Spark/PySpark jobs and poll for completion. Extend this when you need fine-grained control over job configuration.DataprocSparkTask(extendsDataprocBaseTask): Convenience task for running Spark jobs with parameters formain_class,jars, andjob_args.DataprocPysparkTask(extendsDataprocBaseTask): Convenience task for running PySpark jobs with parameters forjob_file,extra_files, andjob_args.CreateDataprocClusterTask(extends_DataprocBaseTask): Creates a Dataproc cluster with configurable machine types, disk sizes, worker counts, and image version. Polls until the cluster reaches RUNNING state.DeleteDataprocClusterTask(extends_DataprocBaseTask): Deletes a Dataproc cluster and polls until it no longer exists.
The module uses the Google API Python client (googleapiclient) with default credentials via google.auth.
Usage
For running jobs, subclass DataprocSparkTask or DataprocPysparkTask and set the required parameters. For cluster lifecycle management, chain CreateDataprocClusterTask, your job task, and DeleteDataprocClusterTask as Luigi dependencies to create ephemeral clusters.
Code Reference
Source Location
luigi/contrib/dataproc.py (260 lines)
Signature
class _DataprocBaseTask(luigi.Task):
gcloud_project_id = luigi.Parameter()
dataproc_cluster_name = luigi.Parameter()
dataproc_region = luigi.Parameter(default="global")
class DataprocBaseTask(_DataprocBaseTask):
def submit_job(self, job_config):
"""Submits a raw job config dict to the Dataproc API."""
def submit_spark_job(self, jars, main_class, job_args=None):
"""Builds and submits a Spark job configuration."""
def submit_pyspark_job(self, job_file, extra_files=list(), job_args=None):
"""Builds and submits a PySpark job configuration."""
def wait_for_job(self):
"""Polls the Dataproc API until the job reaches DONE or ERROR."""
class DataprocSparkTask(DataprocBaseTask):
main_class = luigi.Parameter()
jars = luigi.Parameter(default="")
job_args = luigi.Parameter(default="")
class DataprocPysparkTask(DataprocBaseTask):
job_file = luigi.Parameter()
extra_files = luigi.Parameter(default="")
job_args = luigi.Parameter(default="")
class CreateDataprocClusterTask(_DataprocBaseTask):
gcloud_zone = luigi.Parameter(default="europe-west1-c")
gcloud_network = luigi.Parameter(default="default")
master_node_type = luigi.Parameter(default="n1-standard-2")
master_disk_size = luigi.Parameter(default="100")
worker_node_type = luigi.Parameter(default="n1-standard-2")
worker_disk_size = luigi.Parameter(default="100")
worker_normal_count = luigi.Parameter(default="2")
worker_preemptible_count = luigi.Parameter(default="0")
image_version = luigi.Parameter(default="")
class DeleteDataprocClusterTask(_DataprocBaseTask):
pass
Import
from luigi.contrib.dataproc import DataprocSparkTask, DataprocPysparkTask
from luigi.contrib.dataproc import CreateDataprocClusterTask, DeleteDataprocClusterTask
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
gcloud_project_id |
Parameter |
Google Cloud project ID for the Dataproc cluster and jobs. |
dataproc_cluster_name |
Parameter |
Name of the Dataproc cluster to target. |
main_class / job_file |
Parameter |
The Spark main class or PySpark main Python file URI. |
jars / extra_files |
Parameter |
Comma-separated list of JAR URIs or additional Python file URIs. |
job_args |
Parameter |
Comma-separated job arguments. |
Outputs
| Output | Type | Description |
|---|---|---|
| Job completion | Side effect | The Spark/PySpark job runs to completion on the Dataproc cluster. Raises an exception if the job ends in ERROR state. |
| Cluster state | Side effect | For CreateDataprocClusterTask, a running cluster. For DeleteDataprocClusterTask, the cluster is removed.
|
Usage Examples
from luigi.contrib.dataproc import DataprocSparkTask, CreateDataprocClusterTask
class MySparkJob(DataprocSparkTask):
gcloud_project_id = 'my-gcp-project'
dataproc_cluster_name = 'analytics-cluster'
dataproc_region = 'us-central1'
main_class = 'com.example.MySparkApp'
jars = 'gs://my-bucket/jars/app.jar'
job_args = '--input,gs://my-bucket/data,--output,gs://my-bucket/results'
def requires(self):
return CreateDataprocClusterTask(
gcloud_project_id=self.gcloud_project_id,
dataproc_cluster_name=self.dataproc_cluster_name,
dataproc_region=self.dataproc_region,
worker_normal_count='4'
)
def output(self):
return luigi.contrib.gcs.GCSTarget('gs://my-bucket/results/_SUCCESS')
Related Pages
- Spotify_Luigi_Distributed_Processing -- Principle governing distributed processing task abstractions
luigi.contrib.gcp-- Google Cloud Platform authentication utilities used by the Dataproc client