dagster-sling

raw JSON →
0.29.2 verified Mon Apr 27 auth: no python

Dagster integration with Sling for ETL/ELT tasks, enabling ingestion and replication of data between databases and file systems. Current version 0.29.2, released as part of Dagster 1.13.2. Follows Dagster's release cadence.

pip install dagster-sling
error ModuleNotFoundError: No module named 'sling'
cause Sling CLI is not installed.
fix
Run pip install sling or install via Sling's installation script.
error dagster_sling.errors.SlingConnectionError: Connection 'MY_CONN' not found in sling resources
cause Replication config references a connection name that is not defined in SlingResource.
fix
Add the connection to SlingResource with a matching 'name' field.
error TypeError: 'module' object is not callable
cause Attempting to call `dagster_sling.sling_assets` which was removed in 0.20.0.
fix
Use build_sling_replication instead.
gotcha The replication config's source/target connections must reference connections defined in SlingResource by name. Hardcoding inline connection strings in the config is not supported.
fix Define connections in SlingResource with a 'name' field, then use 'conn: NAME' in the replication config.
breaking In version 0.20.0, the API changed: the function to build assets from a Sling replication config was renamed from `sling_assets` to `build_sling_replication`, and the resource was renamed from `DagsterSlingResource` to `SlingResource`.
fix Update imports and usage: from dagster_sling import SlingResource, build_sling_replication
gotcha Sling must be installed separately via `pip install sling` or `curl -sSL https://slingdata.io/install.sh | bash`. The dagster-sling package does not install it automatically.
fix Ensure Sling CLI is installed and available on PATH.
deprecated The `mode` field in replication config defaults to `full-refresh`. For incremental loads, set `mode: incremental` and provide a `primary_key` and optional `updated_at`.
fix Set `mode: incremental` and required keys.

Define a Sling replication config and build assets from it using SlingResource.

from dagster_sling import SlingResource, build_sling_replication
from dagster import Definitions, asset

replication_config = {
    "source": {
        "conn": "postgres://user:pass@host:5432/db",
        "streams": {
            "public.users": None
        }
    },
    "target": {
        "conn": "snowflake://user:pass@account/db/schema?warehouse=wh&role=role"
    },
    "defaults": {
        "mode": "full-refresh",
        "object": "{stream_schema}_{stream_table}"
    }
}

sling_resource = SlingResource(connections=[
    {"name": "MY_POSTGRES", "conn": "postgresql://user:pass@host:5432/db"},
    {"name": "MY_SNOWFLAKE", "conn": "snowflake://user:pass@account/db/schema?warehouse=wh&role=role"}
])

assets = build_sling_replication(
    replication_config=replication_config,
    resource_def_key="sling",
)

defs = Definitions(assets=assets, resources={"sling": sling_resource})