Target Hotglue

0.1.11 · active · verified Thu Apr 16

`target-hotglue` is an SDK designed for building Singer Targets specifically tailored for the Hotglue data integration platform. It extends the `singer-sdk` to provide features relevant to Hotglue's environment, such as authentication handling and record tracking. The current version is 0.1.11, and it typically follows the release cadence of its underlying `singer-sdk` or Hotglue platform requirements, with focused updates for new features or bug fixes.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a custom target and stream using `target-hotglue` and simulate processing Singer messages. In a production environment, `target-hotglue` typically reads Singer messages from `stdin` via its `main` entry point. This example writes processed records to a local JSONL file defined by the `output_file` configuration setting, which can be overridden by the `TARGET_HOTGLUE_OUTPUT_FILE` environment variable.

import os
import json
from pathlib import Path
from typing import Dict, Any, List

from target_hotglue.target import Target
from target_hotglue.streams import Stream

# --- 1. Define your custom target and streams ---

# Define a custom stream where records will be written
class MyCustomStream(Stream):
    name = "my_custom_stream"
    schema = {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "string"}}}
    key_properties = ["id"]

    def write_record(self, record: Dict[str, Any]) -> None:
        """
        Processes a single record.
        In a real target, this is where you'd write to your destination system.
        For this example, we'll write to a local JSONL file.
        """
        output_file_path = Path(self.target_config.get("output_file", "quickstart_output.jsonl"))
        with output_file_path.open("a") as f:
            f.write(json.dumps(record) + "\n")
        self.logger.info(f"Wrote record to {output_file_path}: {record}")

# Define your custom target, linking it to your stream(s)
class MyCustomTarget(Target):
    name = "target-quickstart"
    # Define a simple configuration schema for the target
    config_jsonschema = {
        "type": "object",
        "properties": {
            "output_file": {"type": "string", "default": "quickstart_output.jsonl", "description": "File to write records to"}
        },
        "required": []
    }

    # Register your custom streams with the target
    # The target will route records for 'my_custom_stream' to MyCustomStream
    def get_available_streams(self) -> List[Stream]:
        return [MyCustomStream(target=self)]

# --- 2. How to run your target (simulated stdin) ---

# This simulates the input a Singer tap would send to target_hotglue
sample_singer_messages = [
    {"type": "SCHEMA", "stream": "my_custom_stream", "schema": MyCustomStream.schema, "key_properties": MyCustomStream.key_properties},
    {"type": "RECORD", "stream": "my_custom_stream", "record": {"id": 1, "value": "Hello from Singer!"}},
    {"type": "RECORD", "stream": "my_custom_stream", "record": {"id": 2, "value": "Another record."}},
    {"type": "STATE", "value": {"bookmarks": {"my_custom_stream": {"last_id": 2}}}}
]

def run_quickstart():
    # Clean up previous run's output file
    output_path = Path(os.environ.get("TARGET_HOTGLUE_OUTPUT_FILE", "quickstart_output.jsonl"))
    if output_path.exists():
        output_path.unlink()

    # Initialize the target with a configuration (can be from env vars or a file)
    # Using os.environ.get for config values as requested.
    config = {
        "output_file": os.environ.get("TARGET_HOTGLUE_OUTPUT_FILE", "quickstart_output.jsonl")
    }
    target_instance = MyCustomTarget(config=config)

    # Manually process sample messages (this replaces target_instance.listen_for_messages() for demo)
    print(f"Processing {len(sample_singer_messages)} sample messages...")
    for message in sample_singer_messages:
        target_instance._process_message(message) # Use internal method for demo simplicity

    print(f"\nQuickstart finished.")
    if output_path.exists():
        print(f"Content written to '{output_path}':")
        print(output_path.read_text())
    else:
        print("No output file was created. Check stream configuration.")

if __name__ == "__main__":
    run_quickstart()

view raw JSON →