Implementation:Spotify Luigi PrestoTask
Overview
PrestoTask is a Luigi task class in the luigi.contrib.presto module that enables execution of Presto SQL queries within Luigi pipelines. The module provides a complete Presto integration stack including the PrestoClient for query execution and progress tracking, PrestoTarget for verifying table existence and row counts, the WithPrestoClient metaclass for automatic client injection, and the presto configuration class for connection settings.
Source Location
| Property | Value |
|---|---|
| Source File | luigi/contrib/presto.py
|
| Lines of Code | 283 |
| Module | luigi.contrib.presto
|
| Domain | Database, SQL_Analytics |
Import Statement
from luigi.contrib.presto import PrestoTask, PrestoTarget, PrestoClient
Classes
presto (Config)
presto(luigi.Config)
Configuration class for Presto connection defaults. Values are read from the [presto] section in luigi.cfg.
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
Parameter |
'localhost' |
Presto server hostname. |
port |
IntParameter |
8090 |
Presto server port number. |
user |
Parameter |
'anonymous' |
Presto connection user. |
catalog |
Parameter |
'hive' |
Default Presto catalog. |
password |
Parameter |
None |
User password for authentication. |
protocol |
Parameter |
'https' |
Connection protocol (http or https). |
poll_interval |
FloatParameter |
1.0 |
How often (in seconds) to poll the Presto REST interface for progress updates. |
PrestoClient
PrestoClient
Helper class wrapping pyhive.presto.Connection for executing Presto queries and tracking progress.
Constructor
PrestoClient.__init__(self, connection, sleep_time=1)
| Parameter | Type | Default | Description |
|---|---|---|---|
connection |
pyhive.presto.Connection |
(required) | An active Presto connection object. |
sleep_time |
int |
1 |
Seconds to sleep between poll cycles. |
Properties
| Property | Return Type | Description |
|---|---|---|
percentage_progress |
float |
Returns the percentage of overall query progress from the status stats, defaulting to 0.1.
|
info_uri |
str |
Returns the Presto query UI link from the current status. |
Methods
| Method | Signature | Description |
|---|---|---|
execute |
execute(self, query, parameters=None, mode=None) |
Executes a Presto query. The mode parameter accepts 'watch' (yields status updates as dicts) or 'fetch' (yields result rows). Defaults to 'watch' mode.
|
WithPrestoClient (Metaclass)
WithPrestoClient(Register)
A metaclass that injects a _client property into classes that use it. The property constructs a PrestoClient by introspecting pyhive.presto.Cursor.__init__ and matching its parameters against instance attributes. This enables automatic connection setup for any class whose fields match Presto connection parameters.
PrestoTarget
PrestoTarget(luigi.Target)
Target representing a Presto-accessible table, optionally scoped to a partition.
Constructor
PrestoTarget.__init__(self, client, catalog, database, table, partition=None)
| Parameter | Type | Default | Description |
|---|---|---|---|
client |
PrestoClient |
(required) | A PrestoClient instance for query execution. |
catalog |
str |
(required) | Presto catalog name. |
database |
str |
(required) | Presto database/schema name. |
table |
str |
(required) | Table name. |
partition |
dict or None |
None |
Optional partition specification as an OrderedDict of column-value pairs.
|
Methods
| Method | Signature | Description |
|---|---|---|
count |
count(self) |
Returns the row count for the specified table and partition. Executes a SELECT COUNT(*) query via the client in 'fetch' mode. Result is cached after first call.
|
exists |
exists(self) |
Returns True if the table exists and has rows in the specified partition. Returns False if the count is zero or the table does not exist (catches DatabaseError).
|
PrestoTask
PrestoTask(rdbms.Query, metaclass=WithPrestoClient)
The primary task class for executing Presto queries in a Luigi pipeline. Inherits from rdbms.Query and uses the WithPrestoClient metaclass for automatic client injection.
Properties
| Property | Return Type | Source | Description |
|---|---|---|---|
host |
str |
presto().host |
Presto server host. |
port |
int |
presto().port |
Presto server port. |
user |
str |
presto().user |
Presto user. |
username |
str |
self.user |
Alias for user.
|
schema |
str |
self.database |
Alias for database.
|
password |
str |
presto().password |
Connection password. |
catalog |
str |
presto().catalog |
Presto catalog. |
poll_interval |
float |
presto().poll_interval |
Polling interval for progress. |
source |
str |
fixed | Always returns 'pyhive'.
|
partition |
None |
fixed | Returns None by default. Override to specify partitions.
|
protocol |
str |
computed | Returns 'https' if password is set, otherwise uses config value.
|
session_props |
None |
fixed | Returns None by default.
|
requests_kwargs |
dict |
fixed | Returns {'verify': False}.
|
Task Attribute
| Attribute | Value | Description |
|---|---|---|
query |
None |
Must be set by subclasses. The Presto SQL query string to execute. |
Methods
| Method | Signature | Description |
|---|---|---|
run |
run(self) |
Executes self.query via the injected _client in watch mode. Sets the tracking URL and progress percentage during execution.
|
output |
output(self) |
Returns a PrestoTarget constructed from the task's catalog, database, table, and partition properties.
|
Configuration
[presto] host: my-presto-host.example.com port: 8090 user: analytics_user catalog: hive password: secret protocol: https poll_interval: 1.0
Usage Example
from luigi.contrib.presto import PrestoTask
class AggregateRevenue(PrestoTask):
date = luigi.DateParameter()
@property
def database(self):
return 'analytics'
@property
def table(self):
return 'daily_revenue'
@property
def query(self):
return """
INSERT INTO analytics.daily_revenue
SELECT date, SUM(amount) as total
FROM transactions.raw_events
WHERE event_date = DATE '%s'
GROUP BY date
""" % self.date.isoformat()
External Dependencies
- pyhive[presto]: Required for
pyhive.presto.Connection,pyhive.presto.Cursor, andpyhive.exc.DatabaseError. A warning is logged if not installed. - Luigi core:
luigi,luigi.contrib.rdbms,luigi.task_register.Register
Related Principles
See Also
- Spotify_Luigi_Task_Definition - Base task class
- Spotify_Luigi_RedisTarget - Another database target implementation
luigi.contrib.rdbms- Parent RDBMS query abstraction