OpenMetadata Ingestion Framework

1.12.5.1 · active · verified Tue Apr 14

OpenMetadata Ingestion is a Python framework designed to build connectors and ingest metadata from various external systems into an OpenMetadata instance via its APIs. It is commonly used within orchestration frameworks like Apache Airflow to automate metadata extraction and is actively maintained with frequent releases, often tied to OpenMetadata server versions. The current version is 1.12.5.1.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic metadata ingestion workflow using the `MetadataWorkflow` class. It shows how to define a minimal workflow configuration in YAML, including the OpenMetadata server connection details and using an environment variable for the JWT token for authentication. Replace `sample-data` with your actual connector configuration (e.g., `mysql`, `snowflake`). Ensure your OpenMetadata server is running and accessible.

import os
import yaml
from metadata.workflow.metadata import MetadataWorkflow

# Define your OpenMetadata server connection and authentication
# It's recommended to use environment variables for sensitive data like JWT tokens
openmetadata_server_config = {
    "hostPort": "http://localhost:8585/api",
    "authProvider": "openmetadata",
    "securityConfig": {
        "jwtToken": os.environ.get('OPENMETADATA_JWT_TOKEN', 'YOUR_OM_JWT_TOKEN_HERE')
    }
}

# Example: A minimal YAML configuration for ingesting metadata from a dummy source
# In a real scenario, this would be a full connector config (e.g., MySQL, Snowflake)
workflow_config_yaml = f"""
source:
  type: "sample-data"
  serviceName: "sample_metadata"
sink:
  type: "metadata-rest"
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: {openmetadata_server_config['hostPort']}
    authProvider: {openmetadata_server_config['authProvider']}
    securityConfig:
      jwtToken: {openmetadata_server_config['securityConfig']['jwtToken']}
"""

# Load the YAML configuration
workflow_config = yaml.safe_load(workflow_config_yaml)

# Create and execute the workflow
print("Starting Metadata Ingestion Workflow...")
workflow = MetadataWorkflow.create(workflow_config)
try:
    workflow.execute()
    print("Metadata Ingestion Workflow completed successfully.")
except Exception as err:
    print(f"Error during metadata ingestion: {err}")
finally:
    workflow.print_status()
    workflow.stop()

view raw JSON →