Dagster
Dagster is an orchestration platform designed for the development, production, and observation of data assets. It emphasizes defining pipelines as data assets and provides a rich development experience with built-in UI and testing capabilities. The library is actively maintained with frequent minor releases, often on a weekly basis, incorporating new features, bug fixes, and improvements.
Warnings
- breaking Dagster 1.0.0 introduced a major API overhaul, deprecating `solid` and `pipeline` in favor of `op` and `job`/`graph` respectively. Code written for pre-1.0 versions is not compatible without migration.
- gotcha The Dagster UI (Dagit) is provided by the `dagster-webserver` package, which must be installed separately from `dagster` core for local development and observation.
- gotcha Starting with Dagster 1.12.18, `psycopg2-binary` is no longer a transitive dependency of `dagster-postgres`. If you use `dagster-postgres` for event log or run storage and rely on this dependency, your environment may break.
- gotcha There are two main CLIs: `dagster` for open-source Dagster environments and `dg` (from `dagster-cloud`) for Dagster Cloud deployments. Using the wrong CLI can lead to confusion or errors.
- deprecated Older methods of defining resource and op configuration using `config_schema` or directly passing dicts are deprecated. The modern approach uses Pydantic-style `Config` classes.
Install
-
pip install dagster dagster-webserver -
pip install dagster-cloud
Imports
- asset
from dagster import asset
- job
from dagster import job
- op
from dagster import op
- graph
from dagster import graph
- Definitions
from dagster import Definitions
- Config
from dagster import Config
- Schedule
from dagster import Schedule
- Sensor
from dagster import Sensor
Quickstart
from dagster import Definitions, asset, job
@asset
def my_first_asset():
"""A simple asset that prints a message and returns a string."""
print("Hello, Dagster!")
return "hello"
@job
def my_asset_job():
my_first_asset()
# To run this locally and see the output immediately:
if __name__ == "__main__":
from dagster import materialize_to_memory
result = materialize_to_memory(assets=[my_first_asset])
assert result.success
print(f"Asset output: {result.output_for_node('my_first_asset')}")
# For UI (Dagit) integration, save the following `Definitions` to a file (e.g., `my_repo.py`):
# defs = Definitions(
# assets=[my_first_asset],
# jobs=[my_asset_job],
# )
# Then run `dagster dev -f my_repo.py` and navigate to http://localhost:3000