OpenMetadata Ingestion Framework
OpenMetadata Ingestion is a Python framework designed to build connectors and ingest metadata from various external systems into an OpenMetadata instance via its APIs. It is commonly used within orchestration frameworks like Apache Airflow to automate metadata extraction and is actively maintained with frequent releases, often tied to OpenMetadata server versions. The current version is 1.12.5.1.
Warnings
- breaking A version mismatch between the `openmetadata-ingestion` Python package and the OpenMetadata server can lead to `ClientInitializationError`, `ValidationError`, or other unexpected behavior. The ingestion package version (e.g., `1.12.5.1`) must match the server version (e.g., `~=1.12.5`).
- breaking Schema changes introduced in OpenMetadata minor releases (e.g., adding `queryStatementSource`, `queryParserConfig`, `statusLookbackDays` fields) require upgrading the ingestion framework. Running an older ingestion framework with a newer server can cause `ValidationError` during pipeline deployment or Airflow DAG build failures.
- breaking The OpenLineage Kinesis connection schema was refactored in version 1.11.12. Existing configurations with flat Kafka-specific fields (e.g., `brokersUrl`, `topicName`) must be migrated to a nested `brokerConfig` object.
- breaking The REST connector configuration changed in version 1.11.11 to support OpenAPI schemas from local files (JSON/YAML) in addition to HTTP URLs via the `openAPISchemaConnection` union type. Existing configurations might require updates.
- gotcha The `openmetadata-ingestion` library requires Python versions 3.9, 3.10, or 3.11. Using unsupported Python versions might lead to compatibility issues or errors.
Install
-
pip install "openmetadata-ingestion==1.12.5.1" -
pip install "openmetadata-ingestion[all]==1.12.5.1" -
pip install "openmetadata-ingestion[postgres,snowflake]==1.12.5.1"
Imports
- MetadataWorkflow
from metadata.workflow.metadata import MetadataWorkflow
- OpenMetadata
from metadata.ingestion.ometa.ometa_api import OpenMetadata
Quickstart
import os
import yaml
from metadata.workflow.metadata import MetadataWorkflow
# Define your OpenMetadata server connection and authentication
# It's recommended to use environment variables for sensitive data like JWT tokens
openmetadata_server_config = {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": os.environ.get('OPENMETADATA_JWT_TOKEN', 'YOUR_OM_JWT_TOKEN_HERE')
}
}
# Example: A minimal YAML configuration for ingesting metadata from a dummy source
# In a real scenario, this would be a full connector config (e.g., MySQL, Snowflake)
workflow_config_yaml = f"""
source:
type: "sample-data"
serviceName: "sample_metadata"
sink:
type: "metadata-rest"
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: {openmetadata_server_config['hostPort']}
authProvider: {openmetadata_server_config['authProvider']}
securityConfig:
jwtToken: {openmetadata_server_config['securityConfig']['jwtToken']}
"""
# Load the YAML configuration
workflow_config = yaml.safe_load(workflow_config_yaml)
# Create and execute the workflow
print("Starting Metadata Ingestion Workflow...")
workflow = MetadataWorkflow.create(workflow_config)
try:
workflow.execute()
print("Metadata Ingestion Workflow completed successfully.")
except Exception as err:
print(f"Error during metadata ingestion: {err}")
finally:
workflow.print_status()
workflow.stop()