Inngest Python SDK

0.5.18 · active · verified Sun Apr 12

The Inngest Python SDK facilitates building and deploying reliable serverless functions and event-driven workflows. It provides tools for defining functions, handling events, and integrating with web frameworks like FastAPI. The library is actively maintained with frequent minor releases, currently at version 0.5.18, and requires Python 3.10 or newer.

Warnings

Install

Imports

Quickstart

This example sets up a simple Inngest function using FastAPI. The function `hello_world` is triggered by an `app/hello.world` event, sleeps for a second, and returns a greeting. It includes a FastAPI endpoint `/trigger` to manually send this event. Remember to install `fastapi` and `uvicorn`, and set `INGEST_EVENT_KEY` and `INGEST_SIGNING_KEY` environment variables (or disable production mode) to run this.

import os
from fastapi import FastAPI
from inngest import Inngest, Event, Function, Step
from inngest.framework.fastapi import InngestFastAPI

# Initialize Inngest client
inngest_client = Inngest(
    app_id="my-python-app",
    event_key=os.environ.get('INGEST_EVENT_KEY', ''),
    signing_key=os.environ.get('INGEST_SIGNING_KEY', ''),
    # Uncomment for local dev without an Inngest deployment
    # is_production=False,
)

# Define an Inngest function
@inngest_client.function(
    Function(
        id="hello-world",
        name="Hello World Function",
        trigger=Event(name="app/hello.world"),
    )
)
async def hello_world(step: Step, event: Event):
    name = event.data.get("name", "world")
    await step.sleep("1s")
    message = await step.run(
        "say-hello", lambda: f"Hello, {name}! This is Inngest from Python."
    )
    return {"message": message}

# Initialize FastAPI app
app = FastAPI()

# Integrate Inngest with FastAPI
inngest_fastapi = InngestFastAPI(inngest_client, [hello_world])
app.include_router(inngest_fastapi.create_router())

# Example endpoint to trigger the Inngest function
@app.post("/trigger")
async def trigger_hello_world(name: str = "friend"):
    await inngest_client.send(Event(name="app/hello.world", data={"name": name}))
    return {"status": "Event sent", "name": name}

# To run this example:
# 1. pip install fastapi uvicorn inngest
# 2. Set environment variables: INGEST_EVENT_KEY and INGEST_SIGNING_KEY
#    (or comment them out and set is_production=False for local-only testing)
# 3. Run: uvicorn main:app --reload
# 4. Access Inngest dashboard or send a POST request to /trigger

view raw JSON →