Apache Airflow Salesforce Provider
raw JSON → 5.14.0 verified Sun Apr 12 auth: no python
The `apache-airflow-providers-salesforce` package is an Apache Airflow provider that enables seamless interaction with the Salesforce platform. It offers operators, hooks, and sensors to perform various operations such as data extraction, data loading, and executing Salesforce API calls directly from Airflow Directed Acyclic Graphs (DAGs). The current version is 5.14.0, and the package maintains an active development cycle with frequent monthly or bi-monthly releases to introduce new features, improvements, and bug fixes.
pip install apache-airflow-providers-salesforce Common errors
error ModuleNotFoundError: No module named 'airflow.providers.salesforce' ↓
cause The 'apache-airflow-providers-salesforce' package is not installed or not accessible in the Python environment.
fix
Install the package using pip: 'pip install apache-airflow-providers-salesforce'.
error ImportError: cannot import name 'SalesforceHook' from 'airflow.providers.salesforce.hooks.salesforce' ↓
cause The 'SalesforceHook' class has been moved or renamed in the 'apache-airflow-providers-salesforce' package.
fix
Update the import statement to: 'from airflow.providers.salesforce.hooks.salesforce import SalesforceHook'.
error AttributeError: module 'airflow.providers.salesforce.hooks.salesforce' has no attribute 'SalesforceHook' ↓
cause The 'SalesforceHook' class is not present in the specified module, possibly due to version changes or incorrect import paths.
fix
Ensure you are using the correct import path: 'from airflow.providers.salesforce.hooks.salesforce import SalesforceHook'.
error Broken DAG: No module named 'airflow.providers.salesforce' ↓
cause The 'apache-airflow-providers-salesforce' package is missing, leading to DAG import failures.
fix
Install the package using pip: 'pip install apache-airflow-providers-salesforce'.
error ImportError: cannot import name 'SalesforceToS3Operator' from 'airflow.providers.salesforce.transfers.salesforce_to_s3' ↓
cause The 'SalesforceToS3Operator' class has been moved or renamed in the 'apache-airflow-providers-salesforce' package.
fix
Update the import statement to: 'from airflow.providers.salesforce.transfers.salesforce_to_s3 import SalesforceToS3Operator'.
Warnings
breaking The `[tableau]` extra has been removed since provider version 5.0.0. Installing `apache-airflow-providers-salesforce[tableau]` will no longer work for Tableau integration. ↓
fix To use Tableau integration, install `apache-airflow-providers-tableau` as a separate provider package: `pip install apache-airflow-providers-tableau`.
breaking Minimum Apache Airflow version requirements have increased over time. For provider version 5.13.0 and newer, `apache-airflow >=2.11.0` is required. Older provider versions required `2.1.0+` or `2.2+`. Installing this provider with an older Airflow version might trigger an automatic Airflow upgrade. ↓
fix Ensure your Airflow environment meets the minimum version requirement (`apache-airflow >=2.11.0`) before installing this provider. If you're on an older Airflow, upgrade Airflow first and then run `airflow upgrade db` if prompted after automatic upgrade.
gotcha Salesforce API calls are subject to rate limits. Frequent or large data operations without proper throttling or batching can lead to API limits being hit, causing task failures. ↓
fix Design your DAGs to respect Salesforce API limits. Utilize batch operations where possible (e.g., `SalesforceBulkOperator`), implement retries with exponential backoff, and consider staggering heavy operations.
gotcha All Salesforce operations require a configured Airflow connection. Incorrect connection details (e.g., wrong username, password, security token, or login URL) will lead to authentication failures. ↓
fix Before using any Salesforce operator or hook, configure a Salesforce connection in the Airflow UI (Admin -> Connections). Ensure all required fields (Host, Login, Password, Security Token) are correctly populated. The security token is especially critical if your IP address is not whitelisted by Salesforce.
Imports
- SalesforceHook
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook - SalesforceToTableOperator
from airflow.providers.salesforce.operators.salesforce import SalesforceToTableOperator - SalesforceBulkOperator
from airflow.providers.salesforce.operators.bulk import SalesforceBulkOperator
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
from airflow.providers.salesforce.operators.salesforce import SalesforceToTableOperator
with DAG(
dag_id='salesforce_example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['salesforce', 'example'],
) as dag:
# Example of using SalesforceToTableOperator to extract data
extract_contacts = SalesforceToTableOperator(
task_id='extract_salesforce_contacts',
salesforce_conn_id='salesforce_default', # Ensure this connection is configured in Airflow UI
sobjects=['Contact'],
table_name='airflow_contacts',
selected_fields=['Id', 'FirstName', 'LastName', 'Email'],
where_clause="MailingState = 'CA'",
# Optional: You might push to a database table or another destination
# E.g., postgres_conn_id='postgres_default'
# For this example, we'll just demonstrate extraction.
)
# Example of using SalesforceHook directly (e.g., in a PythonOperator)
def get_salesforce_data(**kwargs):
hook = SalesforceHook(salesforce_conn_id='salesforce_default')
sf_client = hook.get_conn()
# Query the Salesforce API directly
records = sf_client.query("SELECT Id, Name FROM Account LIMIT 5")
print(f"Fetched {len(records['records'])} Account records.")
return records['records']
# You'd typically use a PythonOperator to wrap the hook usage
# from airflow.operators.python import PythonOperator
# fetch_data_task = PythonOperator(
# task_id='fetch_data_with_hook',
# python_callable=get_salesforce_data,
# )
# extract_contacts >> fetch_data_task