Salesforce Bulk API Client
A Python interface to the Salesforce.com Bulk API, enabling efficient, asynchronous processing of large data sets for insert, update, upsert, and delete operations. The current version is 2.2.0, with a release cadence that has seen updates in 2023 and 2024, indicating active maintenance.
Warnings
- breaking Version 2.0.0 changed the underlying HTTP client from `httplib` to `requests`. While the public `SalesforceBulk` interface largely remained the same, custom HTTP client configurations, lower-level network interactions, or error handling dependent on `httplib` might break.
- gotcha The Salesforce Bulk API is asynchronous. Jobs and batches do not complete immediately. It is crucial to implement polling mechanisms to check job and batch statuses before attempting to retrieve results or assuming completion.
- gotcha Incorrect batch sizing can lead to API limits or performance issues. Salesforce has limits on batch sizes (e.g., 10,000 records or 10MB). Exceeding these limits for a single batch will result in errors.
- gotcha Security tokens are often required when connecting to Salesforce via username/password from untrusted IP ranges or without an active session. Forgetting it or providing an invalid one will lead to authentication failures.
Install
-
pip install salesforce-bulk
Imports
- SalesforceBulk
from salesforce_bulk import SalesforceBulk
- BulkJobState
from salesforce_bulk import BulkJobState
- BulkBatchState
from salesforce_bulk import BulkBatchState
Quickstart
import os
import time
from salesforce_bulk import SalesforceBulk, BulkJobState, BulkBatchState
# Configure Salesforce credentials using environment variables
SF_USERNAME = os.environ.get('SF_USERNAME', 'your_sf_username')
SF_PASSWORD = os.environ.get('SF_PASSWORD', 'your_sf_password')
SF_SECURITY_TOKEN = os.environ.get('SF_SECURITY_TOKEN', 'your_sf_security_token')
SF_INSTANCE_URL = os.environ.get('SF_INSTANCE_URL', 'https://your_instance.my.salesforce.com') # Optional, if using My Domain
if SF_USERNAME == 'your_sf_username':
print("WARNING: Please set SF_USERNAME, SF_PASSWORD, and SF_SECURITY_TOKEN environment variables.")
print("Skipping quickstart execution.")
else:
try:
# Initialize SalesforceBulk client
# For Sandbox/Production, usually username/password/security_token is sufficient.
# For My Domain or specific instances, instance_url might be needed.
sf_bulk = SalesforceBulk(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN,
instance_url=SF_INSTANCE_URL # Optional, if not using My Domain or standard instance
)
# Example: Create an 'Account' insert job
job = sf_bulk.create_job(object_name='Account', operation='insert')
print(f"Created Bulk Job: {job['id']}")
# Prepare data (list of dictionaries)
accounts_data = [
{'Name': 'Test Account 1', 'Industry': 'Technology'},
{'Name': 'Test Account 2', 'Industry': 'Healthcare'}
]
# Add a batch to the job
batch = sf_bulk.post_batch(job_id=job['id'], data=accounts_data)
print(f"Posted Batch: {batch['id']}")
# Close the job (important: no more batches can be added after this)
sf_bulk.close_job(job_id=job['id'])
print(f"Closed Bulk Job: {job['id']}")
# Poll for job and batch status (Bulk API is asynchronous)
print("Polling for job and batch completion...")
while True:
job_status = sf_bulk.get_job_info(job_id=job['id'])
batch_status = sf_bulk.get_batch_info(job_id=job['id'], batch_id=batch['id'])
print(f"Job State: {job_status['state']}, Batch State: {batch_status['state']}")
if job_status['state'] == BulkJobState.CLOSED and batch_status['state'] in [BulkBatchState.COMPLETED, BulkBatchState.FAILED]:
break
time.sleep(5) # Wait for 5 seconds before re-polling
if batch_status['state'] == BulkBatchState.COMPLETED:
print("Batch completed successfully!")
# Retrieve results
results = sf_bulk.get_batch_results(job_id=job['id'], batch_id=batch['id'])
print("Batch Results:")
for res in results:
print(f" Success: {res['success']}, Id: {res['id']}, Error: {res.get('errors')}")
else:
print(f"Batch failed with state: {batch_status['state']}")
print(f"Job failures: {job_status.get('numberRecordsFailed')}")
print(f"Batch errors: {sf_bulk.get_batch_results(job_id=job['id'], batch_id=batch['id'])}")
except Exception as e:
print(f"An error occurred: {e}")