PipelineWise Singer Python Library
This library is a fork of Singer's singer-python, specifically tailored for PipelineWise compatibility. It provides utilities for implementing the Singer.io data replication specification, enabling taps (data extractors) and targets (data loaders) to communicate using a standard JSON-based message format over stdout. The current version is 2.0.1, with releases occurring infrequently, typically driven by critical bug fixes or significant feature enhancements like performance improvements.
Warnings
- breaking Version 2.0.0 replaced the standard `json` library with `orjson` for improved performance. While generally a drop-in replacement, applications with custom JSON handling or those relying on specific `json` library behaviors not supported by `orjson` (e.g., certain `json.dumps` parameters) may experience unexpected issues.
- gotcha The library does not provide a default logging configuration. Users are responsible for setting up their own logging using standard Python `logging` module practices. If no logging is configured, log messages from the library might not appear or might go to stderr without proper formatting. An environment variable `LOGGING_CONF_FILE` can be used to point to a logging configuration file.
- gotcha The `BATCH` message type, introduced in `v1.2.0` and enhanced in `v1.3.0` (with `time_extracted`), allows for more efficient data transfer. However, older taps or targets in a Singer.io pipeline might not fully support this message type, leading to compatibility issues or data processing failures if not all components are updated.
Install
-
pip install pipelinewise-singer-python
Imports
- singer
import singer
Quickstart
import singer
import sys
import json
# Define a simple schema for demonstration
schema = {
'properties': {
'id': {'type': 'integer', 'key': True},
'name': {'type': 'string'},
'value': {'type': 'number'}
}
}
# Write the schema message
singer.write_schema('my_stream', schema, ['id'])
# Write some record messages
records = [
{'id': 1, 'name': 'Item A', 'value': 100.5},
{'id': 2, 'name': 'Item B', 'value': 200.0},
{'id': 3, 'name': 'Item C', 'value': 150.75}
]
for record in records:
singer.write_record('my_stream', record)
# Write a state message (optional, but good practice for incremental processing)
singer.write_state({'last_processed_id': records[-1]['id']})
print("\n--- Output captured (simulated stdout) ---")
# For demonstration, manually capture output to show what 'singer' writes
# In a real Singer pipeline, this output goes to stdout.