Salesforce Bulk API Client

2.2.0 · active · verified Thu Apr 09

A Python interface to the Salesforce.com Bulk API, enabling efficient, asynchronous processing of large data sets for insert, update, upsert, and delete operations. The current version is 2.2.0, with a release cadence that has seen updates in 2023 and 2024, indicating active maintenance.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to Salesforce using `SalesforceBulk`, create an 'insert' job for 'Account' objects, post a batch of data, close the job, and then poll for its completion. It also shows how to retrieve results from completed batches. Remember that the Salesforce Bulk API is asynchronous, so polling is necessary.

import os
import time
from salesforce_bulk import SalesforceBulk, BulkJobState, BulkBatchState

# Configure Salesforce credentials using environment variables
SF_USERNAME = os.environ.get('SF_USERNAME', 'your_sf_username')
SF_PASSWORD = os.environ.get('SF_PASSWORD', 'your_sf_password')
SF_SECURITY_TOKEN = os.environ.get('SF_SECURITY_TOKEN', 'your_sf_security_token')
SF_INSTANCE_URL = os.environ.get('SF_INSTANCE_URL', 'https://your_instance.my.salesforce.com') # Optional, if using My Domain

if SF_USERNAME == 'your_sf_username':
    print("WARNING: Please set SF_USERNAME, SF_PASSWORD, and SF_SECURITY_TOKEN environment variables.")
    print("Skipping quickstart execution.")
else:
    try:
        # Initialize SalesforceBulk client
        # For Sandbox/Production, usually username/password/security_token is sufficient.
        # For My Domain or specific instances, instance_url might be needed.
        sf_bulk = SalesforceBulk(
            username=SF_USERNAME,
            password=SF_PASSWORD,
            security_token=SF_SECURITY_TOKEN,
            instance_url=SF_INSTANCE_URL # Optional, if not using My Domain or standard instance
        )

        # Example: Create an 'Account' insert job
        job = sf_bulk.create_job(object_name='Account', operation='insert')
        print(f"Created Bulk Job: {job['id']}")

        # Prepare data (list of dictionaries)
        accounts_data = [
            {'Name': 'Test Account 1', 'Industry': 'Technology'},
            {'Name': 'Test Account 2', 'Industry': 'Healthcare'}
        ]

        # Add a batch to the job
        batch = sf_bulk.post_batch(job_id=job['id'], data=accounts_data)
        print(f"Posted Batch: {batch['id']}")

        # Close the job (important: no more batches can be added after this)
        sf_bulk.close_job(job_id=job['id'])
        print(f"Closed Bulk Job: {job['id']}")

        # Poll for job and batch status (Bulk API is asynchronous)
        print("Polling for job and batch completion...")
        while True:
            job_status = sf_bulk.get_job_info(job_id=job['id'])
            batch_status = sf_bulk.get_batch_info(job_id=job['id'], batch_id=batch['id'])
            print(f"Job State: {job_status['state']}, Batch State: {batch_status['state']}")

            if job_status['state'] == BulkJobState.CLOSED and batch_status['state'] in [BulkBatchState.COMPLETED, BulkBatchState.FAILED]:
                break
            time.sleep(5) # Wait for 5 seconds before re-polling

        if batch_status['state'] == BulkBatchState.COMPLETED:
            print("Batch completed successfully!")
            # Retrieve results
            results = sf_bulk.get_batch_results(job_id=job['id'], batch_id=batch['id'])
            print("Batch Results:")
            for res in results:
                print(f"  Success: {res['success']}, Id: {res['id']}, Error: {res.get('errors')}")
        else:
            print(f"Batch failed with state: {batch_status['state']}")
            print(f"Job failures: {job_status.get('numberRecordsFailed')}")
            print(f"Batch errors: {sf_bulk.get_batch_results(job_id=job['id'], batch_id=batch['id'])}")

    except Exception as e:
        print(f"An error occurred: {e}")

view raw JSON →