singer_sdk.Sink¶
- class singer_sdk.Sink[source]¶
Abstract base class for target sinks.
- activate_version(new_version)[source]¶
Bump the active version of the target table.
This method should be overridden by developers if a custom implementation is expected.
- Parameters:
new_version (int) – The version number to activate.
- Return type:
None
- clean_up()[source]¶
Perform any clean up actions required at end of a stream.
Implementations should ensure that clean up does not affect resources that may be in use from other instances of the same sink. Stream name alone should not be relied on, it’s recommended to use a uuid as well.
- Return type:
None
- get_batch_processing_timer()[source]¶
Get a timer for measuring batch processing time.
This method can be overridden to customize the batch processing timer, for example to add custom tags or modify the logging behavior.
- Returns:
A timer for measuring batch processing time.
- Return type:
Timer
Added in version 0.51.0.
- get_sink_record_counter()[source]¶
Get a counter for counting records processed by this sink.
This method can be overridden to customize the record counter, for example to add custom tags or modify the logging behavior.
- Returns:
A counter for counting records.
- Return type:
Counter
Added in version 0.51.0.
- get_validator()[source]¶
Get a record validator for this sink.
Override this method to use a custom format validator, or disable record validation by returning None.
- Returns:
An instance of a subclass of
BaseJSONSchemaValidator.- Return type:
BaseJSONSchemaValidator | None
Example implementation using the fastjsonschema library:
import fastjsonschema class FastJSONSchemaValidator(BaseJSONSchemaValidator): def __init__(self, schema: dict[str, t.Any]) -> None: super().__init__(schema) try: self.validator = fastjsonschema.compile(self.schema) except fastjsonschema.JsonSchemaDefinitionException as e: error_message = "Schema Validation Error" raise InvalidJSONSchema(error_message) from e def validate(self, record: dict): try: self.validator(record) except fastjsonschema.JsonSchemaValueException as e: error_message = f"Record Message Validation Error: {e.message}" raise InvalidRecord(error_message, record) from e
- abstractmethod process_batch(context)[source]¶
Process all records per the batch’s context dictionary.
If duplicates are merged, these can optionally be tracked via tally_duplicate_merged().
- Parameters:
context (dict) – Stream partition or context dictionary.
- Raises:
NotImplementedError – If derived class does not override this method.
- Return type:
None
- process_batch_files(encoding, files)[source]¶
Process a batch file with the given batch context.
For JSONL-encoded batch files, expects raw JSON records (one per line), not Singer protocol messages.
- Parameters:
encoding (BaseBatchFileEncoding) – The batch file encoding.
files (t.Sequence[str]) – The batch files to process.
- Raises:
NotImplementedError – If the batch file encoding is not supported.
- Return type:
None
- abstractmethod process_record(record, context)[source]¶
Load the latest record from the stream.
Implementations may either load to the context dict for staging (the default behavior for Batch types), or permanently write out to the target.
Anything appended to
singer_sdk.Sink.records_to_drainwill be automatically passed toprocess_batch()to be permanently written during the process_batch operation.If duplicates are merged, these can be tracked via
tally_duplicate_merged().
- setup()[source]¶
Perform any setup actions at the beginning of a Stream.
Setup is executed once per Sink instance, after instantiation. If a Schema change is detected, a new Sink is instantiated and this method is called again.
- Return type:
None
- final tally_duplicate_merged(count=1)[source]¶
Increment the records merged tally.
This method should be called directly by the Target implementation.
- Parameters:
count (int) – Number to increase record count by.
- Return type:
None
- final tally_record_read(count=1)[source]¶
Increment the records read tally.
This method is called automatically by the SDK when records are read.
- Parameters:
count (int) – Number to increase record count by.
- Return type:
None
- final tally_record_written(count=1)[source]¶
Increment the records written tally.
This method is called automatically by the SDK after
process_record()orprocess_batch().- Parameters:
count (int) – Number to increase record count by.
- Return type:
None
- property batch_size_rows: int | None[source]¶
The maximum number of rows a batch can accumulate before being processed.
Or None if not set.
- property datetime_error_treatment: DatetimeErrorTreatmentEnum[source]¶
ERROR. MAX, or NULL.
- Type:
Treatment to use for datetime parse errors
- fail_on_record_validation_exception: bool = True[source]¶
Interrupt the target execution when a record fails schema validation.
- property key_properties: Sequence[str][source]¶
Key properties.
Override this method to return a list of key properties in a format that is compatible with the target.
- property max_size: int[source]¶
Max record batch size.
The number of records to batch before is_full=True
Changed in version 0.36.0: This property now takes into account the
batch_size_rowsattribute and the correspondingbatch_size_rowstarget setting.
- property process_activate_version_messages: bool[source]¶
True if activate version messages should be processed.