Implementation:Spotify Luigi SparkeyExportTask
Overview
SparkeyExportTask is a Luigi task class in the luigi.contrib.sparkey module that exports tab-separated input data into the Sparkey key-value store format. Sparkey is a simple constant key-value store developed by Spotify that is optimized for bulk read workloads. This task reads lines from its input target, splits them by a configurable separator, and writes the results to a Sparkey log file via the sparkey Python library.
Source Location
| Property | Value |
|---|---|
| Source File | luigi/contrib/sparkey.py
|
| Lines of Code | 61 |
| Module | luigi.contrib.sparkey
|
| Domain | Key_Value_Store, Data_Export |
Import Statement
from luigi.contrib.sparkey import SparkeyExportTask
Class: SparkeyExportTask
SparkeyExportTask(luigi.Task)
A Luigi task that reads tab-separated (or custom-separated) input and writes it to a local Sparkey log file. Subclasses must implement requires() and output().
Class Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
separator |
str |
'\t' (tab) |
The delimiter used to split each input line into a key and value. The first field becomes the key; the remainder becomes the value. |
Constructor
SparkeyExportTask.__init__(self, *args, **kwargs)
Passes all arguments through to the parent luigi.Task.__init__.
Methods
| Method | Signature | Description |
|---|---|---|
run |
run(self) |
Entry point that delegates to _write_sparkey_file().
|
_write_sparkey_file |
_write_sparkey_file(self) |
Performs the actual export. Opens the input target for reading, creates a temporary luigi.LocalTarget, writes key-value pairs to a sparkey.LogWriter, then moves the temporary file to the final output path.
|
Abstract Methods (to be implemented by subclasses)
| Method | Description |
|---|---|
requires(self) |
Must return a Luigi target whose open('r') yields lines of text in the format key{separator}value.
|
output(self) |
Must return a luigi.LocalTarget specifying the output path for the Sparkey log file.
|
Execution Flow
run()calls_write_sparkey_file()._write_sparkey_file()retrieves the input target viaself.input().- It validates that the output is a
luigi.LocalTarget(raisesTypeErrorotherwise). - A temporary
luigi.LocalTarget(is_tmp=True)is created to hold the intermediate Sparkey file. - A
sparkey.LogWriteris opened on the temporary path. - Each line from the input is stripped, split on the separator (with
maxsplit=1), and written as a key-value pair. - The
LogWriteris closed. - The temporary file is moved to the final output path via
temp_output.move(outfile.path).
Usage Example
from luigi.contrib.sparkey import SparkeyExportTask
import luigi
class BuildSparkeyIndex(SparkeyExportTask):
date = luigi.DateParameter()
# Optionally override the separator
separator = '\t'
def requires(self):
return GenerateKeyValuePairs(date=self.date)
def output(self):
return luigi.LocalTarget('/data/sparkey/index_%s.spl' % self.date)
The input file should contain lines in the format:
key1 value1 key2 value2 key3 value3
Input Format
Each line of the input is expected to contain:
- A key (everything before the first separator)
- A value (everything after the first separator)
The split uses maxsplit=1, so values may contain the separator character. For example, with tab separation:
user_123 John Doe active
This produces key "user_123" and value "John\tDoe\tactive".
External Dependencies
- sparkey: The
sparkeyPython package providingsparkey.LogWriter. This is imported inside the_write_sparkey_file()method (lazy import), so the package is only required at runtime. - Luigi core:
luigi.Task,luigi.LocalTarget
Related Principles
See Also
- Spotify_Luigi_Task_Definition - Base task class
luigi.LocalTarget- Required output target type