Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi SalesforceAPI

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_SalesforceAPI.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains CRM, Data_Integration
Last Updated 2026-02-10 08:00 GMT

Overview

SalesforceAPI and QuerySalesforce provide a Luigi integration for querying and extracting data from Salesforce using both the Bulk API and REST API, with support for production and sandbox environments.

Description

The Salesforce contrib module implements a comprehensive client for the Salesforce API alongside a Luigi Task for executing SOQL queries as part of data pipelines. The module contains three primary components:

  • SalesforceAPI -- A client class that wraps the Salesforce SOAP login, REST query API, and Bulk API. It handles session management, authentication via SOAP XML login (supporting both production and sandbox instances), and provides methods for creating bulk operation jobs, submitting batches, polling for batch completion, and retrieving results. The client uses API version 34.0 and communicates with Salesforce using XML for bulk operations and JSON for REST queries. Key features include automatic pagination through query_all() which follows nextRecordsUrl links, and support for multiple content types (CSV, XML, ZIP_CSV, ZIP_XML).
  • QuerySalesforce (extends luigi.Task) -- An abstract task that orchestrates the execution of a SOQL query against Salesforce. It first attempts the query via the Bulk API for efficiency. If the bulk query fails due to unsupported foreign key relationships, it gracefully falls back to the REST API. The task handles multi-result batches by downloading each result file separately and merging them. The SOQL can be provided as a raw string or as a file path.
  • salesforce (extends luigi.Config) -- A configuration class that reads Salesforce credentials (username, password, security_token, sb_security_token) from the Luigi configuration file under the [salesforce] section.

Helper functions include get_soql_fields() for parsing field names from SOQL queries, parse_results() and _traverse_results() for flattening nested Salesforce JSON responses into tabular rows.

The module requires the requests library.

Usage

Use QuerySalesforce when you need to extract data from Salesforce as part of a Luigi data pipeline, for example to pull CRM records into a data warehouse or analytics system. Use SalesforceAPI directly when you need lower-level control over Salesforce interactions, such as performing insert, update, upsert, or delete operations through the Bulk API. This module supports both production and sandbox Salesforce environments, making it suitable for development, testing, and production data integration workflows.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/salesforce.py
  • Lines: 1-706

Signature

class salesforce(luigi.Config):
    username = luigi.Parameter(default='')
    password = luigi.Parameter(default='')
    security_token = luigi.Parameter(default='')
    sb_security_token = luigi.Parameter(default='')

class SalesforceAPI:
    API_VERSION = 34.0
    SOAP_NS = "{urn:partner.soap.sforce.com}"
    API_NS = "{http://www.force.com/2009/06/asyncapi/dataload}"

    def __init__(self, username, password, security_token,
                 sb_token=None, sandbox_name=None): ...
    def start_session(self): ...
    def has_active_session(self): ...
    def query(self, query, **kwargs): ...
    def query_more(self, next_records_identifier,
                   identifier_is_url=False, **kwargs): ...
    def query_all(self, query, **kwargs): ...
    def restful(self, path, params): ...
    def create_operation_job(self, operation, obj,
                             external_id_field_name=None,
                             content_type=None): ...
    def get_job_details(self, job_id): ...
    def abort_job(self, job_id): ...
    def close_job(self, job_id): ...
    def create_batch(self, job_id, data, file_type): ...
    def block_on_batch(self, job_id, batch_id,
                       sleep_time_seconds=5,
                       max_wait_time_seconds=-1): ...
    def get_batch_results(self, job_id, batch_id): ...  # DEPRECATED
    def get_batch_result_ids(self, job_id, batch_id): ...
    def get_batch_result(self, job_id, batch_id, result_id): ...

class QuerySalesforce(luigi.Task):
    @property
    def object_name(self): ...    # str: SF object name (abstract)
    @property
    def soql(self): ...           # str: SOQL query string or file path (abstract)
    @property
    def use_sandbox(self): ...    # bool: use sandbox environment (default: False)
    @property
    def sandbox_name(self): ...   # str or None: sandbox name
    @property
    def is_soql_file(self): ...   # bool: treat soql as file path (default: False)
    @property
    def content_type(self): ...   # str: CSV, XML, ZIP_CSV, ZIP_XML (default: CSV)

    def run(self): ...
    def merge_batch_results(self, result_ids): ...

Import

from luigi.contrib.salesforce import SalesforceAPI, QuerySalesforce, salesforce

I/O Contract

Inputs

Name Type Required Description
username str Yes Salesforce username (via config or SalesforceAPI constructor)
password str Yes Salesforce password
security_token str Yes Salesforce security token for production
sb_token str No Salesforce security token for sandbox environments
sandbox_name str No Name of the Salesforce sandbox to connect to
object_name str Yes The Salesforce object to query (e.g., Lead, Account, or custom MyObj__c)
soql str Yes SOQL query string or path to a file containing the SOQL query
is_soql_file bool No If True, treats the soql value as a file path; defaults to False
content_type str No Response format: CSV, XML, ZIP_CSV, or ZIP_XML; defaults to CSV
use_sandbox bool No Whether to use a sandbox environment; defaults to False

Outputs

Name Type Description
output FileTarget File containing query results in the specified content_type format (default: CSV)
query result (SalesforceAPI.query) dict JSON-decoded response from the Salesforce REST API
query result (SalesforceAPI.query_all) TemporaryFile CSV file containing all paginated results

Usage Examples

Basic Usage

import luigi
from luigi.contrib.salesforce import QuerySalesforce

class ExtractLeads(QuerySalesforce):

    @property
    def object_name(self):
        return 'Lead'

    @property
    def soql(self):
        return "SELECT Id, Email, FirstName, LastName, Company FROM Lead WHERE CreatedDate = TODAY"

    @property
    def content_type(self):
        return 'CSV'

    def output(self):
        return luigi.LocalTarget('/data/salesforce/leads_today.csv')


if __name__ == '__main__':
    luigi.build([ExtractLeads()], local_scheduler=True)

Direct API Usage

from luigi.contrib.salesforce import SalesforceAPI

sf = SalesforceAPI(
    username='user@example.com',
    password='mypassword',
    security_token='ABCDEF123456'
)
sf.start_session()

# Simple query
result = sf.query("SELECT Id, Name FROM Account LIMIT 10")
for record in result['records']:
    print(record['Name'])

# Bulk query
job_id = sf.create_operation_job('query', 'Contact', content_type='CSV')
batch_id = sf.create_batch(job_id, 'SELECT Id, Email FROM Contact', 'CSV')
status = sf.block_on_batch(job_id, batch_id)
result_ids = sf.get_batch_result_ids(job_id, batch_id)
data = sf.get_batch_result(job_id, batch_id, result_ids[0])
sf.close_job(job_id)

Sandbox Usage

from luigi.contrib.salesforce import QuerySalesforce
import luigi

class SandboxExtract(QuerySalesforce):

    @property
    def object_name(self):
        return 'Opportunity'

    @property
    def soql(self):
        return "SELECT Id, Name, Amount, StageName FROM Opportunity"

    @property
    def use_sandbox(self):
        return True

    @property
    def sandbox_name(self):
        return 'dev_sandbox'

    def output(self):
        return luigi.LocalTarget('/data/salesforce/sandbox_opportunities.csv')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment