Implementation:Spotify Luigi MongoTarget
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Database, NoSQL |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
The MongoTarget module provides Luigi target classes for MongoDB resources. It defines a hierarchy of targets that represent different levels of granularity within a MongoDB database: individual document fields, ranges of documents, entire collections, and document counts.
Description
The module defines five classes:
MongoTarget(extendsTarget): Abstract base class for MongoDB targets. Holds aMongoClientreference, database index name, and collection name. Providesget_collection()andget_index()helper methods.
MongoCellTarget(extendsMongoTarget): Targets a specific field within a specific document. Supports nested field paths using MongoDB's dot notation.exists(): ReturnsTrueif the field has a non-None value.read(): Uses$projectaggregate operator to read the field value, supporting nested objects.write(value): Usesupdate_onewith$setandupsert=True.
MongoRangeTarget(extendsMongoTarget): Targets a specific field across a range of documents identified by their IDs.exists(): ReturnsTrueonly if the field exists in ALL specified documents.read(): Returns a dict mapping document IDs to field values.write(values): Accepts a dict of{document_id: value}and uses bulk ordered operations with upsert.get_empty_ids(): Returns the set of document IDs missing the targeted field.
MongoCollectionTarget(extendsMongoTarget): Targets the existence of a collection within the database.exists(): ReturnsTrueif the collection name appears incollection_names().
MongoCountTarget(extendsMongoTarget): Targets a specific document count within a collection.exists(): ReturnsTruewhen the actual count matchestarget_count.read(): Uses$groupaggregate to get an accurate count (safe for sharded clusters).
Usage
Use these targets as return values from output() in Luigi tasks to track completion via MongoDB state rather than file-based markers.
Code Reference
Source Location
luigi/contrib/mongodb.py (223 lines)
Signature
class MongoTarget(Target):
def __init__(self, mongo_client, index, collection):
"""Base MongoDB target."""
def get_collection(self):
"""Returns the pymongo Collection object."""
def get_index(self):
"""Returns the pymongo Database object."""
class MongoCellTarget(MongoTarget):
def __init__(self, mongo_client, index, collection, document_id, path):
"""Target a specific field in a specific document."""
def exists(self): ...
def read(self): ...
def write(self, value): ...
class MongoRangeTarget(MongoTarget):
def __init__(self, mongo_client, index, collection, document_ids, field):
"""Target a field across a range of documents."""
def exists(self): ...
def read(self): ...
def write(self, values): ...
def get_empty_ids(self): ...
class MongoCollectionTarget(MongoTarget):
def __init__(self, mongo_client, index, collection):
"""Target the existence of a collection."""
def exists(self): ...
def read(self): ...
class MongoCountTarget(MongoTarget):
def __init__(self, mongo_client, index, collection, target_count):
"""Target a specific document count in a collection."""
def exists(self): ...
def read(self): ...
Import
from luigi.contrib.mongodb import MongoCellTarget, MongoRangeTarget, MongoCollectionTarget, MongoCountTarget
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
mongo_client |
MongoClient |
A pymongo.MongoClient instance connected to the MongoDB server.
|
index |
str |
The MongoDB database name. |
collection |
str |
The MongoDB collection name. |
document_id |
str |
(MongoCellTarget) The _id of the target document.
|
path |
str |
(MongoCellTarget) Dot-notation path to the target field within the document. |
document_ids |
list of str |
(MongoRangeTarget) List of document _id values to target.
|
field |
str |
(MongoRangeTarget) Top-level field name to check across documents. |
target_count |
int |
(MongoCountTarget) Expected number of documents in the collection. |
Outputs
| Output | Type | Description |
|---|---|---|
exists() |
bool |
Whether the target condition is satisfied (field exists, count matches, etc.). |
read() |
varies | The current value: field value (MongoCellTarget), dict of values (MongoRangeTarget), bool (MongoCollectionTarget), or int (MongoCountTarget). |
write() |
Side effect | Writes value(s) to the MongoDB collection using upsert operations. |
Usage Examples
from pymongo import MongoClient
from luigi.contrib.mongodb import MongoCellTarget, MongoRangeTarget, MongoCountTarget
import luigi
client = MongoClient('mongodb://localhost:27017')
class ComputeUserScore(luigi.Task):
user_id = luigi.Parameter()
def output(self):
return MongoCellTarget(
mongo_client=client,
index='analytics',
collection='users',
document_id=self.user_id,
path='scores.engagement'
)
def run(self):
score = self._compute_score()
self.output().write(score)
class BatchUpdateScores(luigi.Task):
def output(self):
return MongoCountTarget(
mongo_client=client,
index='analytics',
collection='daily_scores',
target_count=1000
)
class PopulateRangeField(luigi.Task):
def output(self):
return MongoRangeTarget(
mongo_client=client,
index='analytics',
collection='items',
document_ids=['item_1', 'item_2', 'item_3'],
field='processed'
)
def run(self):
empty = self.output().get_empty_ids()
values = {doc_id: True for doc_id in empty}
self.output().write(values)
Related Pages
- Spotify_Luigi_NoSQL_Data_Targets -- Principle governing NoSQL data target abstractions
luigi.target.Target-- Base class for all Luigi targetspymongo.MongoClient-- MongoDB Python driver used for connectivity
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment