Soda Core Snowflake Connector

3.5.6 · active · verified Mon Apr 13

Soda Core Snowflake Connector is a plugin for Soda Core, enabling data quality monitoring and testing against Snowflake data warehouses. It extends Soda Core's capabilities to connect to and scan Snowflake databases for data quality checks. The current version is 3.5.6, and it typically releases new versions in alignment with Soda Core's release cycle.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to programmatically connect to Snowflake using environment variables for credentials and run a simple data quality check with Soda Core. It requires `soda-core-snowflake` to be installed for the `type: snowflake` data source to be recognized.

import os
from soda.scan import Scan

# Configure Snowflake connection details using environment variables for security
# Ensure these environment variables are set before running:
# SNOWFLAKE_HOST, SNOWFLAKE_ACCOUNT, SNOWFLAKE_USERNAME, SNOWFLAKE_PASSWORD,
# SNOWFLAKE_ROLE, SNOWFLAKE_WAREHOUSE, SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA

configuration_yaml_content = f"""
data_source snowflake_db:
  type: snowflake
  host: {os.environ.get('SNOWFLAKE_HOST', 'your_snowflake_host.snowflakecomputing.com')}
  account: {os.environ.get('SNOWFLAKE_ACCOUNT', 'your_account_identifier')}
  username: {os.environ.get('SNOWFLAKE_USERNAME', 'your_username')}
  password: {os.environ.get('SNOWFLAKE_PASSWORD', 'your_password')}
  role: {os.environ.get('SNOWFLAKE_ROLE', 'SYSADMIN')}
  warehouse: {os.environ.get('SNOWFLAKE_WAREHOUSE', 'COMPUTE_WH')}
  database: {os.environ.get('SNOWFLAKE_DATABASE', 'SNOWFLAKE_SAMPLE_DATA')}
  schema: {os.environ.get('SNOWFLAKE_SCHEMA', 'TPCH_SF100')}
"""

# Define a simple data quality check
checks_yaml_content = """
checks for snowflake_db:
  - row_count > 0: # Checks if the customer table in the specified schema is not empty
      name: Check for non-empty customer table
      from CUSTOMER
"""

# Create a Scan object from soda-core
scan = Scan()
scan.set_verbose(True)
scan.add_configuration_yaml_str(configuration_yaml_content)
scan.add_checks_yaml_str(checks_yaml_content)

# Execute the scan
scan.execute()

# Process and print scan results
if scan.has_failures():
    print("\n!!! Scan completed with FAILURES !!!")
else:
    print("\nScan completed successfully.")

print("\n--- Scan Logs ---")
print(scan.get_logs_text())

view raw JSON →