Implementation:Spotify Luigi SalesforceAPI
| Knowledge Sources | |
|---|---|
| Domains | CRM, Data_Integration |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
SalesforceAPI and QuerySalesforce provide a Luigi integration for querying and extracting data from Salesforce using both the Bulk API and REST API, with support for production and sandbox environments.
Description
The Salesforce contrib module implements a comprehensive client for the Salesforce API alongside a Luigi Task for executing SOQL queries as part of data pipelines. The module contains three primary components:
- SalesforceAPI -- A client class that wraps the Salesforce SOAP login, REST query API, and Bulk API. It handles session management, authentication via SOAP XML login (supporting both production and sandbox instances), and provides methods for creating bulk operation jobs, submitting batches, polling for batch completion, and retrieving results. The client uses API version 34.0 and communicates with Salesforce using XML for bulk operations and JSON for REST queries. Key features include automatic pagination through query_all() which follows nextRecordsUrl links, and support for multiple content types (CSV, XML, ZIP_CSV, ZIP_XML).
- QuerySalesforce (extends luigi.Task) -- An abstract task that orchestrates the execution of a SOQL query against Salesforce. It first attempts the query via the Bulk API for efficiency. If the bulk query fails due to unsupported foreign key relationships, it gracefully falls back to the REST API. The task handles multi-result batches by downloading each result file separately and merging them. The SOQL can be provided as a raw string or as a file path.
- salesforce (extends luigi.Config) -- A configuration class that reads Salesforce credentials (username, password, security_token, sb_security_token) from the Luigi configuration file under the [salesforce] section.
Helper functions include get_soql_fields() for parsing field names from SOQL queries, parse_results() and _traverse_results() for flattening nested Salesforce JSON responses into tabular rows.
The module requires the requests library.
Usage
Use QuerySalesforce when you need to extract data from Salesforce as part of a Luigi data pipeline, for example to pull CRM records into a data warehouse or analytics system. Use SalesforceAPI directly when you need lower-level control over Salesforce interactions, such as performing insert, update, upsert, or delete operations through the Bulk API. This module supports both production and sandbox Salesforce environments, making it suitable for development, testing, and production data integration workflows.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File: luigi/contrib/salesforce.py
- Lines: 1-706
Signature
class salesforce(luigi.Config):
username = luigi.Parameter(default='')
password = luigi.Parameter(default='')
security_token = luigi.Parameter(default='')
sb_security_token = luigi.Parameter(default='')
class SalesforceAPI:
API_VERSION = 34.0
SOAP_NS = "{urn:partner.soap.sforce.com}"
API_NS = "{http://www.force.com/2009/06/asyncapi/dataload}"
def __init__(self, username, password, security_token,
sb_token=None, sandbox_name=None): ...
def start_session(self): ...
def has_active_session(self): ...
def query(self, query, **kwargs): ...
def query_more(self, next_records_identifier,
identifier_is_url=False, **kwargs): ...
def query_all(self, query, **kwargs): ...
def restful(self, path, params): ...
def create_operation_job(self, operation, obj,
external_id_field_name=None,
content_type=None): ...
def get_job_details(self, job_id): ...
def abort_job(self, job_id): ...
def close_job(self, job_id): ...
def create_batch(self, job_id, data, file_type): ...
def block_on_batch(self, job_id, batch_id,
sleep_time_seconds=5,
max_wait_time_seconds=-1): ...
def get_batch_results(self, job_id, batch_id): ... # DEPRECATED
def get_batch_result_ids(self, job_id, batch_id): ...
def get_batch_result(self, job_id, batch_id, result_id): ...
class QuerySalesforce(luigi.Task):
@property
def object_name(self): ... # str: SF object name (abstract)
@property
def soql(self): ... # str: SOQL query string or file path (abstract)
@property
def use_sandbox(self): ... # bool: use sandbox environment (default: False)
@property
def sandbox_name(self): ... # str or None: sandbox name
@property
def is_soql_file(self): ... # bool: treat soql as file path (default: False)
@property
def content_type(self): ... # str: CSV, XML, ZIP_CSV, ZIP_XML (default: CSV)
def run(self): ...
def merge_batch_results(self, result_ids): ...
Import
from luigi.contrib.salesforce import SalesforceAPI, QuerySalesforce, salesforce
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| username | str | Yes | Salesforce username (via config or SalesforceAPI constructor) |
| password | str | Yes | Salesforce password |
| security_token | str | Yes | Salesforce security token for production |
| sb_token | str | No | Salesforce security token for sandbox environments |
| sandbox_name | str | No | Name of the Salesforce sandbox to connect to |
| object_name | str | Yes | The Salesforce object to query (e.g., Lead, Account, or custom MyObj__c) |
| soql | str | Yes | SOQL query string or path to a file containing the SOQL query |
| is_soql_file | bool | No | If True, treats the soql value as a file path; defaults to False |
| content_type | str | No | Response format: CSV, XML, ZIP_CSV, or ZIP_XML; defaults to CSV |
| use_sandbox | bool | No | Whether to use a sandbox environment; defaults to False |
Outputs
| Name | Type | Description |
|---|---|---|
| output | FileTarget | File containing query results in the specified content_type format (default: CSV) |
| query result (SalesforceAPI.query) | dict | JSON-decoded response from the Salesforce REST API |
| query result (SalesforceAPI.query_all) | TemporaryFile | CSV file containing all paginated results |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.salesforce import QuerySalesforce
class ExtractLeads(QuerySalesforce):
@property
def object_name(self):
return 'Lead'
@property
def soql(self):
return "SELECT Id, Email, FirstName, LastName, Company FROM Lead WHERE CreatedDate = TODAY"
@property
def content_type(self):
return 'CSV'
def output(self):
return luigi.LocalTarget('/data/salesforce/leads_today.csv')
if __name__ == '__main__':
luigi.build([ExtractLeads()], local_scheduler=True)
Direct API Usage
from luigi.contrib.salesforce import SalesforceAPI
sf = SalesforceAPI(
username='user@example.com',
password='mypassword',
security_token='ABCDEF123456'
)
sf.start_session()
# Simple query
result = sf.query("SELECT Id, Name FROM Account LIMIT 10")
for record in result['records']:
print(record['Name'])
# Bulk query
job_id = sf.create_operation_job('query', 'Contact', content_type='CSV')
batch_id = sf.create_batch(job_id, 'SELECT Id, Email FROM Contact', 'CSV')
status = sf.block_on_batch(job_id, batch_id)
result_ids = sf.get_batch_result_ids(job_id, batch_id)
data = sf.get_batch_result(job_id, batch_id, result_ids[0])
sf.close_job(job_id)
Sandbox Usage
from luigi.contrib.salesforce import QuerySalesforce
import luigi
class SandboxExtract(QuerySalesforce):
@property
def object_name(self):
return 'Opportunity'
@property
def soql(self):
return "SELECT Id, Name, Amount, StageName FROM Opportunity"
@property
def use_sandbox(self):
return True
@property
def sandbox_name(self):
return 'dev_sandbox'
def output(self):
return luigi.LocalTarget('/data/salesforce/sandbox_opportunities.csv')