Amundsen Databuilder

raw JSON →
7.5.1 verified Mon Apr 27 auth: no python

Amundsen Databuilder is the data ingestion library for Amundsen, a data discovery and metadata platform. It provides Extractors, Transformers, and Loaders (ETL framework) to pull metadata from various sources (Snowflake, Hive, Postgres, etc.) and push it into a search index (Elasticsearch) and a graph database (Neo4j or Apache Gremlin). Current version is 7.5.1, compatible with Python >=3.8. Releases are frequent, roughly monthly.

pip install amundsen-databuilder
error ModuleNotFoundError: No module named 'databuilder.extractor.snowflake_extractor'
cause Old import path; renamed in 7.0.0.
fix
Use 'from databuilder.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor'.
error TypeError: __init__() got an unexpected keyword argument 'neo4j_endpoint'
cause Neo4j publisher config keys changed; they no longer accept direct keyword args.
fix
Use publisher.init() with a dictionary of config keys, e.g., publisher.init({Neo4jCsvPublisher.NEO4J_END_POINT_KEY: 'bolt://...'}).
breaking Breaking change in version 7.0.0: renamed many extractors and loaders. SnowflakeExtractor became SnowflakeMetadataExtractor. Also removed old deprecated modules like databuilder.extractor.hive_table_metadata_extractor.
fix Update imports to new names. Check release notes for full list of renamed classes.
breaking The FsNeo4jCsvLoader now requires a file system directory; previously it wrote to temp. Must call init() with proper config or it may fail.
fix Pass loader.init({'base_directory': '/tmp/amundsen'}) or similar.
gotcha Common mistake: using the wrong Neo4j CSV loader class name. Users often import 'Neo4jCsvLoader' or 'CsvLoader', but correct class is 'FsNeo4jCsvLoader'.
fix Use 'from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCsvLoader'.

Minimal job that extracts metadata from Snowflake and loads it into Neo4j via CSV. Ensure SNOWFLAKE_* and NEO4J_* environment variables are set.

import logging
import os

from databuilder.job.job import DefaultJob
from databuilder.task.task import DefaultTask
from databuilder.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCsvLoader
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.transformer.base_transformer import NoopTransformer

logging.basicConfig(level=logging.INFO)

# Configuration for Snowflake extractor
extractor = SnowflakeMetadataExtractor()
extractor.init(
    {
        SnowflakeMetadataExtractor.ACCOUNT_KEY: os.environ.get('SNOWFLAKE_ACCOUNT', ''),
        SnowflakeMetadataExtractor.DATABASE_KEY: os.environ.get('SNOWFLAKE_DATABASE', ''),
        SnowflakeMetadataExtractor.SCHEMA_KEY: os.environ.get('SNOWFLAKE_SCHEMA', ''),
        SnowflakeMetadataExtractor.USER_KEY: os.environ.get('SNOWFLAKE_USER', ''),
        SnowflakeMetadataExtractor.PASSWORD_KEY: os.environ.get('SNOWFLAKE_PASSWORD', ''),
        SnowflakeMetadataExtractor.WAREHOUSE_KEY: os.environ.get('SNOWFLAVE_WAREHOUSE', ''),
    }
)

loader = FsNeo4jCsvLoader()
loader.init({})

publisher = Neo4jCsvPublisher()
publisher.init(
    {
        Neo4jCsvPublisher.NEO4J_END_POINT_KEY: os.environ.get('NEO4J_ENDPOINT', 'bolt://localhost:7687'),
        Neo4jCsvPublisher.NEO4J_USER: os.environ.get('NEO4J_USER', 'neo4j'),
        Neo4jCsvPublisher.NEO4J_PASSWORD: os.environ.get('NEO4J_PASSWORD', 'test'),
        Neo4jCsvPublisher.NEO4J_MAX_CONN_LIFE_TIME_SEC: 1000,
        Neo4jCsvPublisher.JOB_PUBLISHER_TASK: 'test',
    }
)

task = DefaultTask(
    extractor=extractor,
    loader=loader,
    transformer=NoopTransformer(),
)

job = DefaultJob(
    task=task,
    publisher=publisher,
)

job.launch()