Dagster GraphQL

1.12.22 · active · verified Thu Apr 09

dagster-graphql is the Python client library for interacting with Dagster's GraphQL API. It provides programmatic access to query metadata about Dagster runs, jobs, ops, assets, and to launch or terminate job executions. This library is a core component of the Dagster ecosystem, enabling automation, custom UIs, and integrations. It is currently at version 1.12.22 and maintains an active release cadence with frequent updates and bug fixes.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize the `DagsterGraphQLClient` and execute a raw GraphQL query to fetch recent Dagster runs. For Dagster Cloud, you would configure the client with your deployment URL and a user token. Ensure a Dagster instance (e.g., via `dagster dev`) is running and accessible at the specified host and port for the local example.

import os
from dagster_graphql import DagsterGraphQLClient

# Configure client for local Dagster instance or Dagster Cloud
# For local: DagsterGraphQLClient("localhost", port_number=3000)
# For Dagster Cloud:
# url = os.environ.get("DAGSTER_CLOUD_URL", "YOUR_ORG.dagster.cloud/prod")
# token = os.environ.get("DAGSTER_CLOUD_API_TOKEN", "YOUR_TOKEN")

# Example for a local Dagster instance (run 'dagster dev' first)
client = DagsterGraphQLClient("localhost", port_number=3000)

try:
    # Example: Get a list of recent runs
    runs_query = """
    query LatestRuns {
        runsOrError {
            __typename
            ... on Runs {
                results {
                    id
                    status
                    pipelineName
                }
            }
            ... on PythonError {
                message
            }
        }
    }
    """
    result = client._execute(runs_query) # _execute is used for raw GraphQL queries

    if result and result.get("runsOrError", {}).get("__typename") == "Runs":
        print("Recent Dagster Runs:")
        for run in result["runsOrError"]["results"]:
            print(f"  ID: {run['id']}, Status: {run['status']}, Job: {run['pipelineName']}")
    elif result and result.get("runsOrError", {}).get("__typename") == "PythonError":
        print(f"Error fetching runs: {result['runsOrError']['message']}")
    else:
        print(f"Unexpected result: {result}")

except Exception as e:
    print(f"An error occurred: {e}")

view raw JSON →