singer_sdk.sql.SQLSink

class singer_sdk.sql.SQLSink[source]

SQL-type sink type.

__init__(target, stream_name, schema, key_properties, connector=None)[source]

Initialize SQL Sink.

Parameters:
  • target (Target) – The target object.

  • stream_name (str) – The source tap’s stream name.

  • schema (dict) – The JSON Schema definition.

  • key_properties (t.Sequence[str] | None) – The primary key columns.

  • connector (_C | None) – Optional connector to reuse.

Return type:

None

activate_version(new_version)[source]

Bump the active version of the target table.

Parameters:

new_version (int) – The version number to activate.

Return type:

None

bulk_insert_records(full_table_name, schema, records)[source]

Bulk insert records to an existing destination table.

The default implementation uses a generic SQLAlchemy bulk insert operation. This method may optionally be overridden by developers in order to provide faster, native bulk uploads.

Parameters:
  • full_table_name (str | FullyQualifiedName) – the target table name.

  • schema (dict) – the JSON schema for the new table, to be used when inferring column names.

  • records (t.Iterable[dict[str, t.Any]]) – the input records.

Returns:

True if table exists, False if not, None if unsure or undetectable.

Return type:

int | None

conform_name(name, object_type=None)[source]

Conform a stream property name to one suitable for the target system.

Transforms names to snake case by default, applicable to most common DBMSs’. Developers may override this method to apply custom transformations to database/schema/table/column names.

Parameters:
  • name (str) – Property name.

  • object_type (str | None) – One of database, schema, table or column.

Returns:

The name transformed to snake case.

Return type:

str

conform_record(record)[source]

Return record dictionary with property names conformed.

Parameters:

record (dict) – Dictionary representing a single record.

Returns:

New record dictionary with conformed column names.

Return type:

dict

conform_schema(schema)[source]

Return schema dictionary with property names conformed.

Parameters:

schema (dict) – JSON schema dictionary.

Returns:

A schema dictionary with the property names conformed.

Return type:

dict

generate_insert_statement(full_table_name, schema)[source]

Generate an insert statement for the given records.

Parameters:
  • full_table_name (str | FullyQualifiedName) – the target table name.

  • schema (dict) – the JSON schema for the new table.

Returns:

An insert statement.

Return type:

str | Executable

merge_upsert_from_table(target_table_name, from_table_name, join_keys)[source]

Merge upsert data from one table to another.

Parameters:
  • target_table_name (str) – The destination table name.

  • from_table_name (str) – The source table name.

  • join_keys (list[str]) – The merge upsert keys, or None to append.

Returns:

The number of records copied, if detectable, or None if the API does not report number of records affected/inserted.

Raises:

NotImplementedError – if the merge upsert capability does not exist or is undefined.

Return type:

int | None

process_batch(context)[source]

Process a batch with the given batch context.

Writes a batch to the SQL target. Developers may override this method in order to provide a more efficient upload/upsert process.

Parameters:

context (dict) – Stream partition or context dictionary.

Return type:

None

setup()[source]

Set up Sink.

This method is called on Sink creation, and creates the required Schema and Table entities in the target database.

Return type:

None

property connection: Connection[source]

SQLAlchemy connection object for this sink.

property connector: _C[source]

The connector object.

property database_name: str | None[source]

Database name or None if using names with no database part.

property default_target_schema: str | None[source]

Default target schema.

property full_schema_name: FullyQualifiedName[source]

Fully qualified schema name.

property full_table_name: FullyQualifiedName[source]

Fully qualified table name.

property key_properties: Sequence[str][source]

Key properties, conformed to target system naming requirements.

property schema_name: str | None[source]

Schema name or None if using names with no schema part.

property table_name: str[source]

Table name, with no schema or database part.