singer_sdk.Sink

class singer_sdk.Sink[source]

Abstract base class for target sinks.

__init__(target, stream_name, schema, key_properties)[source]

Initialize target sink.

Parameters:
  • target (Target) – Target instance.

  • stream_name (str) – Name of the stream to sink.

  • schema (dict) – Schema of the stream to sink.

  • key_properties (t.Sequence[str] | None) – Primary key of the stream to sink.

Return type:

None

activate_version(new_version)[source]

Bump the active version of the target table.

This method should be overridden by developers if a custom implementation is expected.

Parameters:

new_version (int) – The version number to activate.

Return type:

None

clean_up()[source]

Perform any clean up actions required at end of a stream.

Implementations should ensure that clean up does not affect resources that may be in use from other instances of the same sink. Stream name alone should not be relied on, it’s recommended to use a uuid as well.

Return type:

None

get_validator()[source]

Get a record validator for this sink.

Override this method to use a custom format validator, or disable record validation by returning None.

Returns:

An instance of a subclass of BaseJSONSchemaValidator.

Return type:

BaseJSONSchemaValidator | None

Example implementation using the fastjsonschema library:

import fastjsonschema


class FastJSONSchemaValidator(BaseJSONSchemaValidator):
    def __init__(self, schema: dict[str, t.Any]) -> None:
        super().__init__(schema)
        try:
            self.validator = fastjsonschema.compile(self.schema)
        except fastjsonschema.JsonSchemaDefinitionException as e:
            error_message = "Schema Validation Error"
            raise InvalidJSONSchema(error_message) from e

    def validate(self, record: dict):
        try:
            self.validator(record)
        except fastjsonschema.JsonSchemaValueException as e:
            error_message = f"Record Message Validation Error: {e.message}"
            raise InvalidRecord(error_message, record) from e
mark_drained()[source]

Reset records_to_drain and any other tracking.

Return type:

None

preprocess_record(record, context)[source]

Process incoming record and return a modified result.

Parameters:
  • record (dict) – Individual record in the stream.

  • context (dict) – Stream partition or context dictionary.

Returns:

A new, processed record.

Return type:

dict

abstract process_batch(context)[source]

Process all records per the batch’s context dictionary.

If duplicates are merged, these can optionally be tracked via tally_duplicate_merged().

Parameters:

context (dict) – Stream partition or context dictionary.

Raises:

NotImplementedError – If derived class does not override this method.

Return type:

None

process_batch_files(encoding, files)[source]

Process a batch file with the given batch context.

Parameters:
  • encoding (BaseBatchFileEncoding) – The batch file encoding.

  • files (Sequence[str]) – The batch files to process.

Raises:

NotImplementedError – If the batch file encoding is not supported.

Return type:

None

abstract process_record(record, context)[source]

Load the latest record from the stream.

Implementations may either load to the context dict for staging (the default behavior for Batch types), or permanently write out to the target.

Anything appended to singer_sdk.Sink.records_to_drain will be automatically passed to process_batch() to be permanently written during the process_batch operation.

If duplicates are merged, these can be tracked via tally_duplicate_merged().

Parameters:
  • record (dict) – Individual record in the stream.

  • context (dict) – Stream partition or context dictionary.

Return type:

None

setup()[source]

Perform any setup actions at the beginning of a Stream.

Setup is executed once per Sink instance, after instantiation. If a Schema change is detected, a new Sink is instantiated and this method is called again.

Return type:

None

start_drain()[source]

Set and return self._context_draining.

Returns:

TODO

Return type:

dict

final tally_duplicate_merged(count=1)[source]

Increment the records merged tally.

This method should be called directly by the Target implementation.

Parameters:

count (int) – Number to increase record count by.

Return type:

None

final tally_record_read(count=1)[source]

Increment the records read tally.

This method is called automatically by the SDK when records are read.

Parameters:

count (int) – Number to increase record count by.

Return type:

None

final tally_record_written(count=1)[source]

Increment the records written tally.

This method is called automatically by the SDK after process_record() or process_batch().

Parameters:

count (int) – Number to increase record count by.

Return type:

None

property batch_config: BatchConfig | None[source]

Get batch configuration.

Returns:

A frozen (read-only) config dictionary map.

property batch_size_rows: int | None[source]

The maximum number of rows a batch can accumulate before being processed.

Returns:

The max number of rows or None if not set.

property config: Mapping[str, Any][source]

Get plugin configuration.

Returns:

A frozen (read-only) config dictionary map.

property current_size: int[source]

Get current batch size.

Returns:

The number of records to drain.

property datetime_error_treatment: DatetimeErrorTreatmentEnum[source]

ERROR. MAX, or NULL.

Returns:

TODO

Type:

Return a treatment to use for datetime parse errors

fail_on_record_validation_exception: bool = True[source]

Interrupt the target execution when a record fails schema validation.

property include_sdc_metadata_properties: bool[source]

Check if metadata columns should be added.

Returns:

True if metadata columns should be added.

property is_full: bool[source]

Check against the batch size limit.

Returns:

True if the sink needs to be drained.

property key_properties: Sequence[str][source]

Return key properties.

Override this method to return a list of key properties in a format that is compatible with the target.

Returns:

A list of stream key properties.

property max_size: int[source]

Get max batch size.

Returns:

Max number of records to batch before is_full=True

Changed in version 0.36.0: This property now takes into account the batch_size_rows attribute and the corresponding batch_size_rows target setting.

validate_field_string_format = False[source]

Enable JSON schema format validation, for example date-time string fields.

property validate_schema: bool[source]

Enable JSON schema record validation.

Returns:

True if JSON schema validation is enabled.