Apache Airflow Salesforce Provider
The `apache-airflow-providers-salesforce` package is an Apache Airflow provider that enables seamless interaction with the Salesforce platform. It offers operators, hooks, and sensors to perform various operations such as data extraction, data loading, and executing Salesforce API calls directly from Airflow Directed Acyclic Graphs (DAGs). The current version is 5.14.0, and the package maintains an active development cycle with frequent monthly or bi-monthly releases to introduce new features, improvements, and bug fixes.
Warnings
- breaking The `[tableau]` extra has been removed since provider version 5.0.0. Installing `apache-airflow-providers-salesforce[tableau]` will no longer work for Tableau integration.
- breaking Minimum Apache Airflow version requirements have increased over time. For provider version 5.13.0 and newer, `apache-airflow >=2.11.0` is required. Older provider versions required `2.1.0+` or `2.2+`. Installing this provider with an older Airflow version might trigger an automatic Airflow upgrade.
- gotcha Salesforce API calls are subject to rate limits. Frequent or large data operations without proper throttling or batching can lead to API limits being hit, causing task failures.
- gotcha All Salesforce operations require a configured Airflow connection. Incorrect connection details (e.g., wrong username, password, security token, or login URL) will lead to authentication failures.
Install
-
pip install apache-airflow-providers-salesforce
Imports
- SalesforceHook
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
- SalesforceToTableOperator
from airflow.providers.salesforce.operators.salesforce import SalesforceToTableOperator
- SalesforceBulkOperator
from airflow.providers.salesforce.operators.bulk import SalesforceBulkOperator
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
from airflow.providers.salesforce.operators.salesforce import SalesforceToTableOperator
with DAG(
dag_id='salesforce_example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['salesforce', 'example'],
) as dag:
# Example of using SalesforceToTableOperator to extract data
extract_contacts = SalesforceToTableOperator(
task_id='extract_salesforce_contacts',
salesforce_conn_id='salesforce_default', # Ensure this connection is configured in Airflow UI
sobjects=['Contact'],
table_name='airflow_contacts',
selected_fields=['Id', 'FirstName', 'LastName', 'Email'],
where_clause="MailingState = 'CA'",
# Optional: You might push to a database table or another destination
# E.g., postgres_conn_id='postgres_default'
# For this example, we'll just demonstrate extraction.
)
# Example of using SalesforceHook directly (e.g., in a PythonOperator)
def get_salesforce_data(**kwargs):
hook = SalesforceHook(salesforce_conn_id='salesforce_default')
sf_client = hook.get_conn()
# Query the Salesforce API directly
records = sf_client.query("SELECT Id, Name FROM Account LIMIT 5")
print(f"Fetched {len(records['records'])} Account records.")
return records['records']
# You'd typically use a PythonOperator to wrap the hook usage
# from airflow.operators.python import PythonOperator
# fetch_data_task = PythonOperator(
# task_id='fetch_data_with_hook',
# python_callable=get_salesforce_data,
# )
# extract_contacts >> fetch_data_task