singer_sdk.Sink#

class singer_sdk.Sink#

Abstract base class for target sinks.

__init__(target: PluginBase, stream_name: str, schema: dict, key_properties: list[str] | None) None#

Initialize target sink.

Parameters:
  • target – Target instance.

  • stream_name – Name of the stream to sink.

  • schema – Schema of the stream to sink.

  • key_properties – Primary key of the stream to sink.

activate_version(new_version: int) None#

Bump the active version of the target table.

This method should be overridden by developers if a custom implementation is expected.

Parameters:

new_version – The version number to activate.

property batch_config: BatchConfig | None#

Get batch configuration.

Returns:

A frozen (read-only) config dictionary map.

clean_up() None#

Perform any clean up actions required at end of a stream.

Implementations should ensure that clean up does not affect resources that may be in use from other instances of the same sink. Stream name alone should not be relied on, it’s recommended to use a uuid as well.

property config: Mapping[str, Any]#

Get plugin configuration.

Returns:

A frozen (read-only) config dictionary map.

property current_size: int#

Get current batch size.

Returns:

The number of records to drain.

property datetime_error_treatment: DatetimeErrorTreatmentEnum#

ERROR. MAX, or NULL.

Returns:

TODO

Type:

Return a treatment to use for datetime parse errors

property include_sdc_metadata_properties: bool#

Check if metadata columns should be added.

Returns:

True if metadata columns should be added.

property is_full: bool#

Check against size limit.

Returns:

True if the sink needs to be drained.

property key_properties: list[str]#

Return key properties.

Returns:

A list of stream key properties.

mark_drained() None#

Reset records_to_drain and any other tracking.

property max_size: int#

Get max batch size.

Returns:

Max number of records to batch before is_full=True

preprocess_record(record: dict, context: dict) dict#

Process incoming record and return a modified result.

Parameters:
  • record – Individual record in the stream.

  • context – Stream partition or context dictionary.

Returns:

A new, processed record.

abstract process_batch(context: dict) None#

Process all records per the batch’s context dictionary.

If duplicates are merged, these can optionally be tracked via tally_duplicate_merged().

Parameters:

context – Stream partition or context dictionary.

Raises:

NotImplementedError – If derived class does not override this method.

process_batch_files(encoding: BaseBatchFileEncoding, files: Sequence[str]) None#

Process a batch file with the given batch context.

Parameters:
  • encoding – The batch file encoding.

  • files – The batch files to process.

Raises:

NotImplementedError – If the batch file encoding is not supported.

abstract process_record(record: dict, context: dict) None#

Load the latest record from the stream.

Implementations may either load to the context dict for staging (the default behavior for Batch types), or permanently write out to the target.

Anything appended to singer_sdk.Sink.records_to_drain will be automatically passed to process_batch() to be permanently written during the process_batch operation.

If duplicates are merged, these can be tracked via tally_duplicate_merged().

Parameters:
  • record – Individual record in the stream.

  • context – Stream partition or context dictionary.

setup() None#

Perform any setup actions at the beginning of a Stream.

Setup is executed once per Sink instance, after instantiation. If a Schema change is detected, a new Sink is instantiated and this method is called again.

start_drain() dict#

Set and return self._context_draining.

Returns:

TODO

tally_duplicate_merged(count: int = 1) None#

Increment the records merged tally.

This method should be called directly by the Target implementation.

Parameters:

count – Number to increase record count by.

tally_record_read(count: int = 1) None#

Increment the records read tally.

This method is called automatically by the SDK when records are read.

Parameters:

count – Number to increase record count by.

tally_record_written(count: int = 1) None#

Increment the records written tally.

This method is called automatically by the SDK after process_record() or process_batch().

Parameters:

count – Number to increase record count by.