singer_sdk.Sink

class singer_sdk.Sink

Abstract base class for target sinks.

__init__(target: PluginBase, stream_name: str, schema: dict, key_properties: list[str] | None) None

Initialize target sink.

Parameters
  • target – Target instance.

  • stream_name – Name of the stream to sink.

  • schema – Schema of the stream to sink.

  • key_properties – Primary key of the stream to sink.

activate_version(new_version: int) None

Bump the active version of the target table.

This method should be overridden by developers if a custom implementation is expected.

Parameters

new_version – The version number to activate.

property batch_config: BatchConfig | None

Get batch configuration.

Returns

A frozen (read-only) config dictionary map.

clean_up() None

Perform any clean up actions required at end of a stream.

Implementations should ensure that clean up does not affect resources that may be in use from other instances of the same sink. Stream name alone should not be relied on, it’s recommended to use a uuid as well.

property config: Mapping[str, Any]

Get plugin configuration.

Returns

A frozen (read-only) config dictionary map.

property current_size: int

Get current batch size.

Returns

The number of records to drain.

property datetime_error_treatment: DatetimeErrorTreatmentEnum

ERROR. MAX, or NULL.

Returns

TODO

Type

Return a treatment to use for datetime parse errors

property include_sdc_metadata_properties: bool

Check if metadata columns should be added.

Returns

True if metadata columns should be added.

property is_full: bool

Check against size limit.

Returns

True if the sink needs to be drained.

property key_properties: list[str]

Return key properties.

Returns

A list of stream key properties.

mark_drained() None

Reset records_to_drain and any other tracking.

property max_size: int

Get max batch size.

Returns

Max number of records to batch before is_full=True

preprocess_record(record: dict, context: dict) dict

Process incoming record and return a modified result.

Parameters
  • record – Individual record in the stream.

  • context – Stream partition or context dictionary.

Returns

A new, processed record.

abstract process_batch(context: dict) None

Process all records per the batch’s context dictionary.

If duplicates are merged, these can optionally be tracked via tally_duplicate_merged().

Parameters

context – Stream partition or context dictionary.

Raises

NotImplementedError – If derived class does not override this method.

process_batch_files(encoding: BaseBatchFileEncoding, files: Sequence[str]) None

Process a batch file with the given batch context.

Parameters
  • encoding – The batch file encoding.

  • files – The batch files to process.

Raises

NotImplementedError – If the batch file encoding is not supported.

abstract process_record(record: dict, context: dict) None

Load the latest record from the stream.

Implementations may either load to the context dict for staging (the default behavior for Batch types), or permanently write out to the target.

Anything appended to singer_sdk.Sink.records_to_drain will be automatically passed to process_batch() to be permanently written during the process_batch operation.

If duplicates are merged, these can be tracked via tally_duplicate_merged().

Parameters
  • record – Individual record in the stream.

  • context – Stream partition or context dictionary.

setup() None

Perform any setup actions at the beginning of a Stream.

Setup is executed once per Sink instance, after instantiation. If a Schema change is detected, a new Sink is instantiated and this method is called again.

start_drain() dict

Set and return self._context_draining.

Returns

TODO

tally_duplicate_merged(count: int = 1) None

Increment the records merged tally.

This method should be called directly by the Target implementation.

Parameters

count – Number to increase record count by.

tally_record_read(count: int = 1) None

Increment the records read tally.

This method is called automatically by the SDK when records are read.

Parameters

count – Number to increase record count by.

tally_record_written(count: int = 1) None

Increment the records written tally.

This method is called automatically by the SDK after process_record() or process_batch().

Parameters

count – Number to increase record count by.