Singer Python SDK
The `singer-python` library provides essential utilities for implementing Singer protocol taps (data extractors) and targets (data loaders). It simplifies tasks like parsing configuration, managing state, and emitting/consuming standard Singer messages (schema, record, state, activate_version, log, batch). It is currently at version 6.8.0 and is actively maintained, though more opinionated frameworks like Meltano SDK are also available for higher-level abstractions built on the Singer protocol.
Warnings
- breaking Any non-Singer output (e.g., plain text via `print()`) to `stdout` will break downstream Singer targets. Singer targets expect `stdout` to contain only line-delimited JSON messages conforming to the Singer protocol.
- gotcha Incorrect or untimely state management can lead to unreliable incremental syncs, data loss, or reprocessing. State messages should accurately reflect processed data and be emitted only *after* all corresponding records for a given checkpoint have been successfully written.
- gotcha `singer-python` provides utilities for writing schemas but does not automatically handle complex schema evolution logic (e.g., column renames, type changes requiring migration). Taps must adhere to the Singer spec for schema changes.
- gotcha For new projects requiring a more opinionated framework with robust CLI parsing, test helpers, and higher-level abstractions for Singer, consider using `meltano-sdk` which builds upon the Singer protocol, rather than `singer-python` directly.
Install
-
pip install singer-python
Imports
- singer
import singer
- get_logger
singer.get_logger()
- write_schema
singer.write_schema(...)
- write_record
singer.write_record(...)
- write_state
singer.write_state(...)
- cli
import singer.cli
Quickstart
import singer
import json
import sys
# Get a Singer logger (logs to stderr by default)
LOGGER = singer.get_logger()
# 1. Define a schema for a stream
stream_name = "users"
schema = {
"type": "object",
"properties": {
"id": {"type": "integer", "key_properties": ["id"]},
"name": {"type": "string"},
"email": {"type": "string", "format": "email"}
}
}
key_properties = ["id"]
# 2. Write the schema message to stdout
# This is how a Tap declares the structure of data it will send
singer.write_schema(
stream_name=stream_name,
schema=schema,
key_properties=key_properties
)
LOGGER.info(f"Schema written for stream '{stream_name}'")
# 3. Write record messages to stdout
# These are the actual data rows
records = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"}
]
for record in records:
singer.write_record(
stream_name=stream_name,
record=record
)
LOGGER.debug(f"Record written for '{stream_name}': {record['id']}")
# 4. Write a state message to stdout
# This allows a Tap to checkpoint its progress for incremental syncs
state = {"bookmarks": {stream_name: {"last_id": records[-1]["id"]}}}
singer.write_state(state)
LOGGER.info(f"State written: {state}")
# The actual Singer messages are written to stdout.
# To see them, run this script and redirect stdout:
# python your_script.py > output.jsonl