Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi SparkeyExportTask

From Leeroopedia


Overview

SparkeyExportTask is a Luigi task class in the luigi.contrib.sparkey module that exports tab-separated input data into the Sparkey key-value store format. Sparkey is a simple constant key-value store developed by Spotify that is optimized for bulk read workloads. This task reads lines from its input target, splits them by a configurable separator, and writes the results to a Sparkey log file via the sparkey Python library.

Source Location

Property Value
Source File luigi/contrib/sparkey.py
Lines of Code 61
Module luigi.contrib.sparkey
Domain Key_Value_Store, Data_Export

Import Statement

from luigi.contrib.sparkey import SparkeyExportTask

Class: SparkeyExportTask

SparkeyExportTask(luigi.Task)

A Luigi task that reads tab-separated (or custom-separated) input and writes it to a local Sparkey log file. Subclasses must implement requires() and output().

Class Attributes

Attribute Type Default Description
separator str '\t' (tab) The delimiter used to split each input line into a key and value. The first field becomes the key; the remainder becomes the value.

Constructor

SparkeyExportTask.__init__(self, *args, **kwargs)

Passes all arguments through to the parent luigi.Task.__init__.

Methods

Method Signature Description
run run(self) Entry point that delegates to _write_sparkey_file().
_write_sparkey_file _write_sparkey_file(self) Performs the actual export. Opens the input target for reading, creates a temporary luigi.LocalTarget, writes key-value pairs to a sparkey.LogWriter, then moves the temporary file to the final output path.

Abstract Methods (to be implemented by subclasses)

Method Description
requires(self) Must return a Luigi target whose open('r') yields lines of text in the format key{separator}value.
output(self) Must return a luigi.LocalTarget specifying the output path for the Sparkey log file.

Execution Flow

  1. run() calls _write_sparkey_file().
  2. _write_sparkey_file() retrieves the input target via self.input().
  3. It validates that the output is a luigi.LocalTarget (raises TypeError otherwise).
  4. A temporary luigi.LocalTarget(is_tmp=True) is created to hold the intermediate Sparkey file.
  5. A sparkey.LogWriter is opened on the temporary path.
  6. Each line from the input is stripped, split on the separator (with maxsplit=1), and written as a key-value pair.
  7. The LogWriter is closed.
  8. The temporary file is moved to the final output path via temp_output.move(outfile.path).

Usage Example

from luigi.contrib.sparkey import SparkeyExportTask
import luigi

class BuildSparkeyIndex(SparkeyExportTask):
    date = luigi.DateParameter()

    # Optionally override the separator
    separator = '\t'

    def requires(self):
        return GenerateKeyValuePairs(date=self.date)

    def output(self):
        return luigi.LocalTarget('/data/sparkey/index_%s.spl' % self.date)

The input file should contain lines in the format:

key1	value1
key2	value2
key3	value3

Input Format

Each line of the input is expected to contain:

  • A key (everything before the first separator)
  • A value (everything after the first separator)

The split uses maxsplit=1, so values may contain the separator character. For example, with tab separation:

user_123	John	Doe	active

This produces key "user_123" and value "John\tDoe\tactive".

External Dependencies

  • sparkey: The sparkey Python package providing sparkey.LogWriter. This is imported inside the _write_sparkey_file() method (lazy import), so the package is only required at runtime.
  • Luigi core: luigi.Task, luigi.LocalTarget

Related Principles

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment