Amazon SageMaker FeatureStore PySpark Bindings

1.1.3 · active · verified Mon Apr 13

Amazon SageMaker FeatureStore PySpark Bindings provide a Spark Datasource to read data from SageMaker Feature Store and a Spark Processor to write data to SageMaker Feature Store. This package is specifically designed for Spark 3.1 environments, enabling users to interact with SageMaker Feature Store directly from Spark jobs. The current version is 1.1.3, with updates released as needed for new features or compatibility.

Warnings

Install

Imports

Quickstart

Demonstrates how to initialize the FeatureStoreManager and read data from a SageMaker Feature Group using PySpark. For local execution, ensure your SparkSession is configured with the correct JAR package.

import os
from pyspark.sql import SparkSession
from sagemaker_featurestore_pyspark import FeatureStoreManager

# IMPORTANT: For local PySpark execution, you MUST include the spark.jars.packages config.
# Replace '1.1.3' with the exact version of the sagemaker-featurestore-pyspark-sdk you are using.
# Ensure PySpark and Java are installed and configured for your environment.
spark = SparkSession.builder \
    .appName("FeatureStorePySparkQuickstart") \
    .config("spark.jars.packages", "software.amazon.sagemaker:sagemaker-featurestore-pyspark-sdk:1.1.3") \
    .getOrCreate()

# Replace with your actual Feature Group name and AWS region
feature_group_name = os.environ.get('SAGEMAKER_FEATURE_GROUP_NAME', 'your-feature-group-name')
aws_region = os.environ.get('AWS_REGION', 'us-east-1')

# Initialize FeatureStoreManager
# AWS credentials are typically sourced from the Spark environment (IAM Role, AWS_ACCESS_KEY_ID/SECRET_ACCESS_KEY).
fs_manager = FeatureStoreManager(spark_session=spark, region=aws_region)

try:
    # Read data from the Feature Group's online store
    df = fs_manager.read_feature_group(
        feature_group_name=feature_group_name
    )
    print(f"Successfully read data from Feature Group: {feature_group_name}")
    df.show(5)
    df.printSchema()

except Exception as e:
    print(f"Error interacting with Feature Group {feature_group_name}: {e}")
    print("Troubleshooting: Ensure Feature Group exists, credentials are set, and Spark environment is configured (esp. 'spark.jars.packages').")
finally:
    spark.stop()

view raw JSON →