Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi PrestoTask

From Leeroopedia


Overview

PrestoTask is a Luigi task class in the luigi.contrib.presto module that enables execution of Presto SQL queries within Luigi pipelines. The module provides a complete Presto integration stack including the PrestoClient for query execution and progress tracking, PrestoTarget for verifying table existence and row counts, the WithPrestoClient metaclass for automatic client injection, and the presto configuration class for connection settings.

Source Location

Property Value
Source File luigi/contrib/presto.py
Lines of Code 283
Module luigi.contrib.presto
Domain Database, SQL_Analytics

Import Statement

from luigi.contrib.presto import PrestoTask, PrestoTarget, PrestoClient

Classes

presto (Config)

presto(luigi.Config)

Configuration class for Presto connection defaults. Values are read from the [presto] section in luigi.cfg.

Parameter Type Default Description
host Parameter 'localhost' Presto server hostname.
port IntParameter 8090 Presto server port number.
user Parameter 'anonymous' Presto connection user.
catalog Parameter 'hive' Default Presto catalog.
password Parameter None User password for authentication.
protocol Parameter 'https' Connection protocol (http or https).
poll_interval FloatParameter 1.0 How often (in seconds) to poll the Presto REST interface for progress updates.

PrestoClient

PrestoClient

Helper class wrapping pyhive.presto.Connection for executing Presto queries and tracking progress.

Constructor

PrestoClient.__init__(self, connection, sleep_time=1)
Parameter Type Default Description
connection pyhive.presto.Connection (required) An active Presto connection object.
sleep_time int 1 Seconds to sleep between poll cycles.

Properties

Property Return Type Description
percentage_progress float Returns the percentage of overall query progress from the status stats, defaulting to 0.1.
info_uri str Returns the Presto query UI link from the current status.

Methods

Method Signature Description
execute execute(self, query, parameters=None, mode=None) Executes a Presto query. The mode parameter accepts 'watch' (yields status updates as dicts) or 'fetch' (yields result rows). Defaults to 'watch' mode.

WithPrestoClient (Metaclass)

WithPrestoClient(Register)

A metaclass that injects a _client property into classes that use it. The property constructs a PrestoClient by introspecting pyhive.presto.Cursor.__init__ and matching its parameters against instance attributes. This enables automatic connection setup for any class whose fields match Presto connection parameters.

PrestoTarget

PrestoTarget(luigi.Target)

Target representing a Presto-accessible table, optionally scoped to a partition.

Constructor

PrestoTarget.__init__(self, client, catalog, database, table, partition=None)
Parameter Type Default Description
client PrestoClient (required) A PrestoClient instance for query execution.
catalog str (required) Presto catalog name.
database str (required) Presto database/schema name.
table str (required) Table name.
partition dict or None None Optional partition specification as an OrderedDict of column-value pairs.

Methods

Method Signature Description
count count(self) Returns the row count for the specified table and partition. Executes a SELECT COUNT(*) query via the client in 'fetch' mode. Result is cached after first call.
exists exists(self) Returns True if the table exists and has rows in the specified partition. Returns False if the count is zero or the table does not exist (catches DatabaseError).

PrestoTask

PrestoTask(rdbms.Query, metaclass=WithPrestoClient)

The primary task class for executing Presto queries in a Luigi pipeline. Inherits from rdbms.Query and uses the WithPrestoClient metaclass for automatic client injection.

Properties

Property Return Type Source Description
host str presto().host Presto server host.
port int presto().port Presto server port.
user str presto().user Presto user.
username str self.user Alias for user.
schema str self.database Alias for database.
password str presto().password Connection password.
catalog str presto().catalog Presto catalog.
poll_interval float presto().poll_interval Polling interval for progress.
source str fixed Always returns 'pyhive'.
partition None fixed Returns None by default. Override to specify partitions.
protocol str computed Returns 'https' if password is set, otherwise uses config value.
session_props None fixed Returns None by default.
requests_kwargs dict fixed Returns {'verify': False}.

Task Attribute

Attribute Value Description
query None Must be set by subclasses. The Presto SQL query string to execute.

Methods

Method Signature Description
run run(self) Executes self.query via the injected _client in watch mode. Sets the tracking URL and progress percentage during execution.
output output(self) Returns a PrestoTarget constructed from the task's catalog, database, table, and partition properties.

Configuration

[presto]
host: my-presto-host.example.com
port: 8090
user: analytics_user
catalog: hive
password: secret
protocol: https
poll_interval: 1.0

Usage Example

from luigi.contrib.presto import PrestoTask

class AggregateRevenue(PrestoTask):
    date = luigi.DateParameter()

    @property
    def database(self):
        return 'analytics'

    @property
    def table(self):
        return 'daily_revenue'

    @property
    def query(self):
        return """
            INSERT INTO analytics.daily_revenue
            SELECT date, SUM(amount) as total
            FROM transactions.raw_events
            WHERE event_date = DATE '%s'
            GROUP BY date
        """ % self.date.isoformat()

External Dependencies

  • pyhive[presto]: Required for pyhive.presto.Connection, pyhive.presto.Cursor, and pyhive.exc.DatabaseError. A warning is logged if not installed.
  • Luigi core: luigi, luigi.contrib.rdbms, luigi.task_register.Register

Related Principles

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment