singer_sdk.Sink¶
- class singer_sdk.Sink[source]¶
Abstract base class for target sinks.
- activate_version(new_version)[source]¶
Bump the active version of the target table.
This method should be overridden by developers if a custom implementation is expected.
- Parameters:
new_version (int) – The version number to activate.
- Return type:
None
- clean_up()[source]¶
Perform any clean up actions required at end of a stream.
Implementations should ensure that clean up does not affect resources that may be in use from other instances of the same sink. Stream name alone should not be relied on, it’s recommended to use a uuid as well.
- Return type:
None
- get_validator()[source]¶
Get a record validator for this sink.
Override this method to use a custom format validator, or disable record validation by returning None.
- Returns:
An instance of a subclass of
BaseJSONSchemaValidator
.- Return type:
BaseJSONSchemaValidator | None
Example implementation using the fastjsonschema library:
import fastjsonschema class FastJSONSchemaValidator(BaseJSONSchemaValidator): def __init__(self, schema: dict[str, t.Any]) -> None: super().__init__(schema) try: self.validator = fastjsonschema.compile(self.schema) except fastjsonschema.JsonSchemaDefinitionException as e: error_message = "Schema Validation Error" raise InvalidJSONSchema(error_message) from e def validate(self, record: dict): try: self.validator(record) except fastjsonschema.JsonSchemaValueException as e: error_message = f"Record Message Validation Error: {e.message}" raise InvalidRecord(error_message, record) from e
- abstract process_batch(context)[source]¶
Process all records per the batch’s context dictionary.
If duplicates are merged, these can optionally be tracked via tally_duplicate_merged().
- Parameters:
context (dict) – Stream partition or context dictionary.
- Raises:
NotImplementedError – If derived class does not override this method.
- Return type:
None
- process_batch_files(encoding, files)[source]¶
Process a batch file with the given batch context.
- Parameters:
- Raises:
NotImplementedError – If the batch file encoding is not supported.
- Return type:
None
- abstract process_record(record, context)[source]¶
Load the latest record from the stream.
Implementations may either load to the context dict for staging (the default behavior for Batch types), or permanently write out to the target.
Anything appended to
singer_sdk.Sink.records_to_drain
will be automatically passed toprocess_batch()
to be permanently written during the process_batch operation.If duplicates are merged, these can be tracked via
tally_duplicate_merged()
.
- setup()[source]¶
Perform any setup actions at the beginning of a Stream.
Setup is executed once per Sink instance, after instantiation. If a Schema change is detected, a new Sink is instantiated and this method is called again.
- Return type:
None
- final tally_duplicate_merged(count=1)[source]¶
Increment the records merged tally.
This method should be called directly by the Target implementation.
- Parameters:
count (int) – Number to increase record count by.
- Return type:
None
- final tally_record_read(count=1)[source]¶
Increment the records read tally.
This method is called automatically by the SDK when records are read.
- Parameters:
count (int) – Number to increase record count by.
- Return type:
None
- final tally_record_written(count=1)[source]¶
Increment the records written tally.
This method is called automatically by the SDK after
process_record()
orprocess_batch()
.- Parameters:
count (int) – Number to increase record count by.
- Return type:
None
- property batch_config: BatchConfig | None[source]¶
Get batch configuration.
- Returns:
A frozen (read-only) config dictionary map.
- property batch_processing_timer: Timer[source]¶
Get the batch processing timer for this sink.
- Returns:
The Meter instance for the batch processing timer.
- property batch_size_rows: int | None[source]¶
The maximum number of rows a batch can accumulate before being processed.
- Returns:
The max number of rows or None if not set.
- property config: Mapping[str, Any][source]¶
Get plugin configuration.
- Returns:
A frozen (read-only) config dictionary map.
- property current_size: int[source]¶
Get current batch size.
- Returns:
The number of records to drain.
- property datetime_error_treatment: DatetimeErrorTreatmentEnum[source]¶
ERROR. MAX, or NULL.
- Returns:
TODO
- Type:
Return a treatment to use for datetime parse errors
- fail_on_record_validation_exception: bool = True[source]¶
Interrupt the target execution when a record fails schema validation.
- property include_sdc_metadata_properties: bool[source]¶
Check if metadata columns should be added.
- Returns:
True if metadata columns should be added.
- property is_full: bool[source]¶
Check against the batch size limit.
- Returns:
True if the sink needs to be drained.
- property key_properties: Sequence[str][source]¶
Return key properties.
Override this method to return a list of key properties in a format that is compatible with the target.
- Returns:
A list of stream key properties.
- property max_size: int[source]¶
Get max batch size.
- Returns:
Max number of records to batch before is_full=True
Changed in version 0.36.0: This property now takes into account the
batch_size_rows
attribute and the correspondingbatch_size_rows
target setting.
- property record_counter_metric: Counter[source]¶
Get the record counter for this sink.
- Returns:
The Meter instance for the record counter.