Implementation:Spotify Luigi HiveTarget
| Knowledge Sources | |
|---|---|
| Domains | Data_Warehouse, Hadoop |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Luigi contrib module providing Apache Hive integration with multiple client backends, table/partition targets, a query task, and an external task wrapper for Hive-based data warehousing workflows.
Description
The hive module provides comprehensive integration with Apache Hive for Luigi pipelines operating in Hadoop ecosystems. It supports multiple Hive client implementations and various target types for tables and partitions.
Client Hierarchy:
- HiveClient (abstract base): Defines the interface for all Hive clients with abstract methods
table_location,table_schema,table_exists, andpartition_spec.
- HiveCommandClient (extends
HiveClient): Uses thehiveCLI command to query metadata. Runs commands likeSHOW TABLES,DESCRIBE FORMATTED, andSHOW PARTITIONSvia subprocess. This is the default client for CDH4 syntax.
- ApacheHiveCommandClient (extends
HiveCommandClient): A variant that ignores non-zero return codes from the hive command, needed because the Apache release of Hive returns non-zero exit codes for some metadata operations (e.g., DESCRIBE on non-existent tables).
- MetastoreClient (extends
HiveClient): Connects directly to the Hive Metastore via Thrift protocol. UsesHiveThriftContextas a context manager to manage the Thrift transport connection. Requires CDH Thrift bindings (hive_metastoreandThriftHiveMetastore).
- WarehouseHiveClient (extends
HiveClient): Makes existence decisions based on the presence of directories in HDFS at the expected warehouse location. Supports configurable warehouse location and ignored file masks (via Luigi config).
Targets:
- HivePartitionTarget (extends
luigi.Target): Represents a Hive table partition. Constructed with table name, partition dict (e.g.,{"date": "2024-01-01"}), database name, and optional fail_missing_table flag. Theexists()method checks partition existence, and thepathproperty returns the HDFS location.
- HiveTableTarget (extends
HivePartitionTarget): Represents a non-partitioned Hive table. A convenience subclass that setspartition=Noneandfail_missing_table=False.
Tasks:
- HiveQueryTask (extends
BaseHadoopJobTask): Abstract task for running Hive queries. Subclasses must implement thequery()method. Supportshiverc()for loading RC files,hivevars()for script-local variables, andhiveconfs()for Hadoop/Hive configuration settings (job name, reduce tasks, scheduler pool, bytes per reducer, max reducers).
- HiveQueryRunner (extends
JobRunner): Executes aHiveQueryTaskby writing the query to a temporary file and invoking the hive CLI with the appropriate arguments.
- ExternalHiveTask (extends
ExternalTask): An external task that outputs aHivePartitionTarget. Has Luigi parameters fordatabase,table, andpartition(as a DictParameter).
Utility Functions:
run_hive(args): Runs the hive CLI with given arguments.run_hive_cmd(hivecmd): Runs a Hive query string.run_hive_script(script): Runs a Hive script file.get_default_client(): Returns the appropriate client based on thehive.releaseconfig setting (cdh4, apache, metastore, or warehouse).
Usage
Use this module when your Luigi pipeline operates in a Hadoop ecosystem and needs to interact with Apache Hive tables and partitions. It supports querying, checking table existence, and representing Hive objects as Luigi targets for dependency resolution.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/hive.py - Lines: 1-558
Signature
class HiveTableTarget(HivePartitionTarget):
def __init__(self, table, database='default', client=None):
...
class HivePartitionTarget(luigi.Target):
def __init__(self, table, partition, database='default',
fail_missing_table=True, client=None):
...
class HiveQueryTask(luigi.contrib.hadoop.BaseHadoopJobTask):
@abc.abstractmethod
def query(self):
...
class ExternalHiveTask(luigi.ExternalTask):
database = luigi.Parameter(default='default')
table = luigi.Parameter()
partition = luigi.DictParameter(default={})
...
class HiveCommandClient(HiveClient):
...
class ApacheHiveCommandClient(HiveCommandClient):
...
class MetastoreClient(HiveClient):
...
Import
from luigi.contrib.hive import (
HiveTableTarget,
HivePartitionTarget,
HiveQueryTask,
ExternalHiveTask,
HiveCommandClient,
ApacheHiveCommandClient,
MetastoreClient,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| table | str | Yes | Hive table name |
| database | str | No | Hive database name; defaults to 'default' |
| partition | dict | No (Target) | Partition specification as a dict of {"column": "value"}; None for non-partitioned tables
|
| fail_missing_table | bool | No | If True, raise error when table doesn't exist; defaults to True for partitions |
| client | HiveClient | No | Pre-configured Hive client; defaults to get_default_client()
|
| query | str | Yes (task) | The Hive SQL query text (abstract method on HiveQueryTask) |
| n_reduce_tasks | int | No | Number of reduce tasks for MapReduce configuration |
| pool | str | No | Scheduler pool name (fair or capacity scheduler) |
Outputs
| Name | Type | Description |
|---|---|---|
| bool (exists) | bool | exists() returns True if the Hive table or partition exists
|
| path | str | The HDFS location of the Hive table or partition data |
| table_schema | list of tuples | List of (name, type) tuples for table columns |
| stdout | str | Output of Hive CLI commands for query tasks |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.hive import HiveTableTarget, HivePartitionTarget
class CheckHiveTable(luigi.Task):
def output(self):
return HiveTableTarget(
table='user_events',
database='analytics'
)
def run(self):
# Your processing logic that creates the Hive table
pass
Running a Hive Query
from luigi.contrib.hive import HiveQueryTask, HivePartitionTarget
class DailyAggregation(HiveQueryTask):
date = luigi.DateParameter()
def query(self):
return """
INSERT OVERWRITE TABLE analytics.daily_summary
PARTITION (dt='{date}')
SELECT user_id, COUNT(*) as event_count
FROM analytics.user_events
WHERE dt = '{date}'
GROUP BY user_id
""".format(date=self.date)
def output(self):
return HivePartitionTarget(
table='daily_summary',
partition={'dt': str(self.date)},
database='analytics'
)
Using ExternalHiveTask as a Dependency
from luigi.contrib.hive import ExternalHiveTask
class ProcessData(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return ExternalHiveTask(
database='raw',
table='incoming_events',
partition={'date': str(self.date)}
)
def run(self):
# Process data from the external Hive table
pass