StreamSets Python SDK

6.6.2 · active · verified Tue Apr 14

The StreamSets Python SDK enables developers to programmatically interact with StreamSets DataOps Platform components, including Control Hub, Data Collector, and Transformer. It facilitates automation of data pipeline creation, management, monitoring, and deployment workflows. The library is currently at version 6.6.2 and receives regular updates to support new platform features and provide bug fixes.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to StreamSets Control Hub using API credentials and list available Data Collectors. It also includes an example of how to initialize a `PipelineBuilder` to create an in-memory pipeline definition. Remember that creating an object in code does not automatically deploy it to StreamSets; explicit `publish` or `add` calls are required.

import os
from streamsets.sdk import ControlHub

# Ensure CRED_ID and CRED_TOKEN are set as environment variables
# Example: export CRED_ID='your_credential_id'
# Example: export CRED_TOKEN='your_credential_token'

cred_id = os.environ.get('CRED_ID', 'YOUR_CRED_ID')
cred_token = os.environ.get('CRED_TOKEN', 'YOUR_CRED_TOKEN')
sch_url = os.environ.get('SCH_URL', 'https://cloud.streamsets.com') # Or your on-prem Control Hub URL

# Connect to Control Hub
try:
    control_hub = ControlHub(sch_url=sch_url, credential_id=cred_id, credential_token=cred_token)
    print(f"Successfully connected to Control Hub at {sch_url}")

    # Example: List available Data Collectors
    data_collectors = control_hub.get_data_collectors()
    if data_collectors:
        print("Available Data Collectors:")
        for dc in data_collectors:
            print(f" - {dc.name} (ID: {dc.id})")
    else:
        print("No Data Collectors found.")

    # Example: Create a simple pipeline (requires a Data Collector if deployed)
    # This example demonstrates creating a pipeline in memory, but not deploying it.
    # For deployment, you'd typically use PipelineBuilder and then control_hub.publish_pipeline()
    # or control_hub.create_job() if managing via Control Hub.
    print("\nExample: Creating a simple in-memory pipeline builder (not yet deployed to Control Hub).")
    pipeline_builder = control_hub.get_pipeline_builder()
    dev_data_generator = pipeline_builder.add_stage('Dev Data Generator')
    trash = pipeline_builder.add_stage('Trash')
    dev_data_generator >> trash # Connect stages
    
    my_pipeline = pipeline_builder.build('My First SDK Pipeline')
    print(f"Created pipeline builder object: {my_pipeline.name}")
    # To deploy this pipeline, you would use control_hub.publish_pipeline(my_pipeline)

except Exception as e:
    print(f"Error connecting to Control Hub or performing operations: {e}")

view raw JSON →