class singer_sdk.SQLSink

SQL-type sink type.

__init__(target: PluginBase, stream_name: str, schema: Dict, key_properties: Optional[List[str]], connector: Optional[SQLConnector] = None) None

Initialize SQL Sink.

  • target – The target object.

  • stream_name – The source tap’s stream name.

  • schema – The JSON Schema definition.

  • key_properties – The primary key columns.

  • connector – Optional connector to reuse.

activate_version(new_version: int) None

Bump the active version of the target table.


new_version – The version number to activate.

bulk_insert_records(full_table_name: str, schema: dict, records: Iterable[Dict[str, Any]]) Optional[int]

Bulk insert records to an existing destination table.

The default implementation uses a generic SQLAlchemy bulk insert operation. This method may optionally be overridden by developers in order to provide faster, native bulk uploads.

  • full_table_name – the target table name.

  • schema – the JSON schema for the new table, to be used when inferring column names.

  • records – the input records.


True if table exists, False if not, None if unsure or undetectable.

property connection: Connection

Get or set the SQLAlchemy connection for this sink.


A connection object.

property connector: SQLConnector

The connector object.


The connector object.

create_table_with_records(full_table_name: Optional[str], schema: dict, records: Iterable[Dict[str, Any]], primary_keys: Optional[List[str]] = None, partition_keys: Optional[List[str]] = None, as_temp_table: bool = False) None

Create an empty table.

  • full_table_name – the target table name.

  • schema – the JSON schema for the new table.

  • records – records to load.

  • primary_keys – list of key properties.

  • partition_keys – list of partition keys.

  • as_temp_table – True to create a temp table.

property database_name: Optional[str]

Returns the DB name or None if using names with no database part.


The target database name.

property full_table_name: str

Gives the fully qualified table name.


The fully qualified table name.

generate_insert_statement(full_table_name: str, schema: dict) Union[str, Executable]

Generate an insert statement for the given records.

  • full_table_name – the target table name.

  • schema – the JSON schema for the new table.


An insert statement.

merge_upsert_from_table(target_table_name: str, from_table_name: str, join_keys: List[str]) Optional[int]

Merge upsert data from one table to another.

  • target_table_name – The destination table name.

  • from_table_name – The source table name.

  • join_keys – The merge upsert keys, or None to append.


The number of records copied, if detectable, or None if the API does not report number of records affected/inserted.


NotImplementedError – if the merge upsert capability does not exist or is undefined.

process_batch(context: dict) None

Process a batch with the given batch context.

Writes a batch to the SQL target. Developers may override this method in order to provide a more efficient upload/upsert process.


context – Stream partition or context dictionary.

property schema_name: Optional[str]

Returns the schema name or None if using names with no schema part.


The target schema name.

property table_name: str

Returns the table name, with no schema or database part.


The target table name.