Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi HiveTarget

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_HiveTarget.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Warehouse, Hadoop
Last Updated 2026-02-10 08:00 GMT

Overview

Luigi contrib module providing Apache Hive integration with multiple client backends, table/partition targets, a query task, and an external task wrapper for Hive-based data warehousing workflows.

Description

The hive module provides comprehensive integration with Apache Hive for Luigi pipelines operating in Hadoop ecosystems. It supports multiple Hive client implementations and various target types for tables and partitions.

Client Hierarchy:

  • HiveClient (abstract base): Defines the interface for all Hive clients with abstract methods table_location, table_schema, table_exists, and partition_spec.
  • HiveCommandClient (extends HiveClient): Uses the hive CLI command to query metadata. Runs commands like SHOW TABLES, DESCRIBE FORMATTED, and SHOW PARTITIONS via subprocess. This is the default client for CDH4 syntax.
  • ApacheHiveCommandClient (extends HiveCommandClient): A variant that ignores non-zero return codes from the hive command, needed because the Apache release of Hive returns non-zero exit codes for some metadata operations (e.g., DESCRIBE on non-existent tables).
  • MetastoreClient (extends HiveClient): Connects directly to the Hive Metastore via Thrift protocol. Uses HiveThriftContext as a context manager to manage the Thrift transport connection. Requires CDH Thrift bindings (hive_metastore and ThriftHiveMetastore).
  • WarehouseHiveClient (extends HiveClient): Makes existence decisions based on the presence of directories in HDFS at the expected warehouse location. Supports configurable warehouse location and ignored file masks (via Luigi config).

Targets:

  • HivePartitionTarget (extends luigi.Target): Represents a Hive table partition. Constructed with table name, partition dict (e.g., {"date": "2024-01-01"}), database name, and optional fail_missing_table flag. The exists() method checks partition existence, and the path property returns the HDFS location.
  • HiveTableTarget (extends HivePartitionTarget): Represents a non-partitioned Hive table. A convenience subclass that sets partition=None and fail_missing_table=False.

Tasks:

  • HiveQueryTask (extends BaseHadoopJobTask): Abstract task for running Hive queries. Subclasses must implement the query() method. Supports hiverc() for loading RC files, hivevars() for script-local variables, and hiveconfs() for Hadoop/Hive configuration settings (job name, reduce tasks, scheduler pool, bytes per reducer, max reducers).
  • HiveQueryRunner (extends JobRunner): Executes a HiveQueryTask by writing the query to a temporary file and invoking the hive CLI with the appropriate arguments.
  • ExternalHiveTask (extends ExternalTask): An external task that outputs a HivePartitionTarget. Has Luigi parameters for database, table, and partition (as a DictParameter).

Utility Functions:

  • run_hive(args): Runs the hive CLI with given arguments.
  • run_hive_cmd(hivecmd): Runs a Hive query string.
  • run_hive_script(script): Runs a Hive script file.
  • get_default_client(): Returns the appropriate client based on the hive.release config setting (cdh4, apache, metastore, or warehouse).

Usage

Use this module when your Luigi pipeline operates in a Hadoop ecosystem and needs to interact with Apache Hive tables and partitions. It supports querying, checking table existence, and representing Hive objects as Luigi targets for dependency resolution.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/hive.py
  • Lines: 1-558

Signature

class HiveTableTarget(HivePartitionTarget):
    def __init__(self, table, database='default', client=None):
        ...

class HivePartitionTarget(luigi.Target):
    def __init__(self, table, partition, database='default',
                 fail_missing_table=True, client=None):
        ...

class HiveQueryTask(luigi.contrib.hadoop.BaseHadoopJobTask):
    @abc.abstractmethod
    def query(self):
        ...

class ExternalHiveTask(luigi.ExternalTask):
    database = luigi.Parameter(default='default')
    table = luigi.Parameter()
    partition = luigi.DictParameter(default={})
    ...

class HiveCommandClient(HiveClient):
    ...

class ApacheHiveCommandClient(HiveCommandClient):
    ...

class MetastoreClient(HiveClient):
    ...

Import

from luigi.contrib.hive import (
    HiveTableTarget,
    HivePartitionTarget,
    HiveQueryTask,
    ExternalHiveTask,
    HiveCommandClient,
    ApacheHiveCommandClient,
    MetastoreClient,
)

I/O Contract

Inputs

Name Type Required Description
table str Yes Hive table name
database str No Hive database name; defaults to 'default'
partition dict No (Target) Partition specification as a dict of {"column": "value"}; None for non-partitioned tables
fail_missing_table bool No If True, raise error when table doesn't exist; defaults to True for partitions
client HiveClient No Pre-configured Hive client; defaults to get_default_client()
query str Yes (task) The Hive SQL query text (abstract method on HiveQueryTask)
n_reduce_tasks int No Number of reduce tasks for MapReduce configuration
pool str No Scheduler pool name (fair or capacity scheduler)

Outputs

Name Type Description
bool (exists) bool exists() returns True if the Hive table or partition exists
path str The HDFS location of the Hive table or partition data
table_schema list of tuples List of (name, type) tuples for table columns
stdout str Output of Hive CLI commands for query tasks

Usage Examples

Basic Usage

import luigi
from luigi.contrib.hive import HiveTableTarget, HivePartitionTarget

class CheckHiveTable(luigi.Task):
    def output(self):
        return HiveTableTarget(
            table='user_events',
            database='analytics'
        )

    def run(self):
        # Your processing logic that creates the Hive table
        pass

Running a Hive Query

from luigi.contrib.hive import HiveQueryTask, HivePartitionTarget

class DailyAggregation(HiveQueryTask):
    date = luigi.DateParameter()

    def query(self):
        return """
            INSERT OVERWRITE TABLE analytics.daily_summary
            PARTITION (dt='{date}')
            SELECT user_id, COUNT(*) as event_count
            FROM analytics.user_events
            WHERE dt = '{date}'
            GROUP BY user_id
        """.format(date=self.date)

    def output(self):
        return HivePartitionTarget(
            table='daily_summary',
            partition={'dt': str(self.date)},
            database='analytics'
        )

Using ExternalHiveTask as a Dependency

from luigi.contrib.hive import ExternalHiveTask

class ProcessData(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return ExternalHiveTask(
            database='raw',
            table='incoming_events',
            partition={'date': str(self.date)}
        )

    def run(self):
        # Process data from the external Hive table
        pass

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment