singer_sdk.SQLConnector

class singer_sdk.SQLConnector[source]

Base class for SQLAlchemy-based connectors.

The connector class serves as a wrapper around the SQL connection.

The functions of the connector are: - connecting to the source - generating SQLAlchemy connection and engine objects - discovering schema catalog entries - performing type conversions to/from JSONSchema types - dialect-specific functions, such as escaping and fully qualified names

__init__(config=None, sqlalchemy_url=None)[source]

Initialize the SQL connector.

Parameters:
  • config (dict | None) – The parent tap or target object’s config.

  • sqlalchemy_url (str | None) – Optional URL for the connection.

Return type:

None

column_exists(full_table_name, column_name)[source]

Determine if the target table already exists.

Parameters:
  • full_table_name (str) – the target table name.

  • column_name (str) – the target column name.

Returns:

True if table exists, False if not.

Return type:

bool

create_empty_table(full_table_name, schema, primary_keys=None, partition_keys=None, as_temp_table=False)[source]

Create an empty target table.

Parameters:
  • full_table_name (str) – the target table name.

  • schema (dict) – the JSON schema for the new table.

  • primary_keys (Sequence[str] | None) – list of key properties.

  • partition_keys (list[str] | None) – list of partition keys.

  • as_temp_table (bool) – True to create a temp table.

Raises:
  • NotImplementedError – if temp tables are unsupported and as_temp_table=True.

  • RuntimeError – if a variant schema is passed with no properties defined.

Return type:

None

create_engine()[source]

Creates and returns a new engine. Do not call outside of _engine.

NOTE: Do not call this method. The only place that this method should be called is inside the self._engine method. If you’d like to access the engine on a connector, use self._engine.

This method exists solely so that tap/target developers can override it on their subclass of SQLConnector to perform custom engine creation logic.

Returns:

A new SQLAlchemy Engine.

Return type:

Engine

create_schema(schema_name)[source]

Create target schema.

Parameters:

schema_name (str) – The target schema to create.

Return type:

None

create_sqlalchemy_connection()[source]

(DEPRECATED) Return a new SQLAlchemy connection using the provided config.

Do not use the SQLConnector’s connection directly. Instead, if you need to execute something that isn’t available on the connector currently, make a child class and add a method on that connector.

By default this will create using the sqlalchemy stream_results=True option described here:

https://docs.sqlalchemy.org/en/14/core/connections.html#using-server-side-cursors-a-k-a-stream-results

Developers may override this method if their provider does not support server side cursors (stream_results) or in order to use different configurations options when creating the connection object.

Returns:

A newly created SQLAlchemy engine object.

Return type:

Connection

create_sqlalchemy_engine()[source]

(DEPRECATED) Return a new SQLAlchemy engine using the provided config.

Developers can generally override just one of the following: sqlalchemy_engine, sqlalchemy_url`.

Returns:

A newly created SQLAlchemy engine object.

Return type:

Engine

delete_old_versions(*, full_table_name, version_column_name, current_version)[source]

Hard-deletes any old version rows from the table.

This is used to clean up old versions when an ACTIVATE_VERSION message is received.

Parameters:
  • full_table_name (str) – The fully qualified table name.

  • version_column_name (str) – The name of the version column.

  • current_version (int) – The current ACTIVATE version of the table.

Return type:

None

deserialize_json(json_str)[source]

Deserialize a JSON string to an object.

Tap connectors may override this method to provide custom deserialization logic for JSON types.

Parameters:

json_str (str) – The JSON string to deserialize.

Returns:

The deserialized object.

Return type:

object

Added in version 0.31.0.

discover_catalog_entries()[source]

Return a list of catalog entries from discovery.

Returns:

The discovered catalog entries as a list.

Return type:

list[dict]

discover_catalog_entry(engine, inspected, schema_name, table_name, is_view)[source]

Create CatalogEntry object for the given table or a view.

Parameters:
  • engine (Engine) – SQLAlchemy engine

  • inspected (Inspector) – SQLAlchemy inspector instance for engine

  • schema_name (str) – Schema name to inspect

  • table_name (str) – Name of the table or a view

  • is_view (bool) – Flag whether this object is a view, returned by get_object_names

Returns:

CatalogEntry object for the given table or a view

Return type:

CatalogEntry

static get_column_add_ddl(table_name, column_name, column_type)[source]

Get the create column DDL statement.

Override this if your database uses a different syntax for creating columns.

Parameters:
  • table_name (str) – Fully qualified table name of column to alter.

  • column_name (str) – Column name to create.

  • column_type (TypeEngine) – New column sqlalchemy type.

Returns:

A sqlalchemy DDL instance.

Return type:

DDL

static get_column_alter_ddl(table_name, column_name, column_type)[source]

Get the alter column DDL statement.

Override this if your database uses a different syntax for altering columns.

Parameters:
  • table_name (str) – Fully qualified table name of column to alter.

  • column_name (str) – Column name to alter.

  • column_type (TypeEngine) – New column type string.

Returns:

A sqlalchemy DDL instance.

Return type:

DDL

static get_column_rename_ddl(table_name, column_name, new_column_name)[source]

Get the create column DDL statement.

Override this if your database uses a different syntax for renaming columns.

Parameters:
  • table_name (str) – Fully qualified table name of column to alter.

  • column_name (str) – Existing column name.

  • new_column_name (str) – New column name.

Returns:

A sqlalchemy DDL instance.

Return type:

DDL

static get_fully_qualified_name(table_name=None, schema_name=None, db_name=None, delimiter='.')[source]

Concatenates a fully qualified name from the parts.

Parameters:
  • table_name (str | None) – The name of the table.

  • schema_name (str | None) – The name of the schema. Defaults to None.

  • db_name (str | None) – The name of the database. Defaults to None.

  • delimiter (str) – Generally: ‘.’ for SQL names and ‘-’ for Singer names.

Raises:

ValueError – If all 3 name parts not supplied.

Returns:

The fully qualified name as a string.

Return type:

str

get_object_names(engine, inspected, schema_name)[source]

Return a list of syncable objects.

Parameters:
  • engine (Engine) – SQLAlchemy engine

  • inspected (Inspector) – SQLAlchemy inspector instance for engine

  • schema_name (str) – Schema name to inspect

Returns:

List of tuples (<table_or_view_name>, <is_view>)

Return type:

list[tuple[str, bool]]

get_schema_names(engine, inspected)[source]

Return a list of schema names in DB.

Parameters:
  • engine (Engine) – SQLAlchemy engine

  • inspected (Inspector) – SQLAlchemy inspector instance for engine

Returns:

List of schema names

Return type:

list[str]

get_sqlalchemy_url(config)[source]

Return the SQLAlchemy URL string.

Developers can generally override just one of the following: sqlalchemy_engine, get_sqlalchemy_url.

Parameters:

config (dict[str, Any]) – A dictionary of settings from the tap or target config.

Returns:

The URL as a string.

Raises:

ConfigValidationError – If no valid sqlalchemy_url can be found.

Return type:

str

get_table(full_table_name, column_names=None)[source]

Return a table object.

Parameters:
  • full_table_name (str) – Fully qualified table name.

  • column_names (list[str] | None) – A list of column names to filter to.

Returns:

A table object with column list.

Return type:

Table

get_table_columns(full_table_name, column_names=None)[source]

Return a list of table columns.

Parameters:
  • full_table_name (str) – Fully qualified table name.

  • column_names (list[str] | None) – A list of column names to filter to.

Returns:

An ordered list of column objects.

Return type:

dict[str, Column]

merge_sql_types(sql_types)[source]

Return a compatible SQL type for the selected type list.

Parameters:

sql_types (Sequence[TypeEngine]) – List of SQL types.

Returns:

A SQL type that is compatible with the input types.

Raises:

ValueError – If sql_types argument has zero members.

Return type:

TypeEngine

parse_full_table_name(full_table_name)[source]

Parse a fully qualified table name into its parts.

Developers may override this method if their platform does not support the traditional 3-part convention: db_name.schema_name.table_name

Parameters:

full_table_name (str) – A table name or a fully qualified table name. Depending on SQL the platform, this could take the following forms: - <db>.<schema>.<table> (three part names) - <db>.<table> (platforms which do not use schema groupings) - <schema>.<name> (if DB name is already in context) - <table> (if DB name and schema name are already in context)

Returns:

A three part tuple (db_name, schema_name, table_name) with any unspecified or unused parts returned as None.

Return type:

tuple[str | None, str | None, str]

prepare_column(full_table_name, column_name, sql_type)[source]

Adapt target table to provided schema if possible.

Parameters:
  • full_table_name (str) – the target table name.

  • column_name (str) – the target column name.

  • sql_type (TypeEngine) – the SQLAlchemy type.

Return type:

None

prepare_schema(schema_name)[source]

Create the target database schema.

Parameters:

schema_name (str) – The target schema name.

Return type:

None

prepare_table(full_table_name, schema, primary_keys, partition_keys=None, as_temp_table=False)[source]

Adapt target table to provided schema if possible.

Parameters:
  • full_table_name (str) – the target table name.

  • schema (dict) – the JSON Schema for the table.

  • primary_keys (Sequence[str]) – list of key properties.

  • partition_keys (list[str] | None) – list of partition keys.

  • as_temp_table (bool) – True to create a temp table.

Return type:

None

quote(name)[source]

Quote a name if it needs quoting, using ‘.’ as a name-part delimiter.

Examples

“my_table” => “my_table” “my_schema.my_table” => “my_schema.`my_table`”

Parameters:

name (str) – The unquoted name.

Returns:

The quoted name.

Return type:

str

static remove_collation(column_type)[source]

Removes collation for the given column TypeEngine instance.

Parameters:

column_type (TypeEngine) – Column SQLAlchemy type.

Returns:

The removed collation as a string.

Return type:

str | None

rename_column(full_table_name, old_name, new_name)[source]

Rename the provided columns.

Parameters:
  • full_table_name (str) – The fully qualified table name.

  • old_name (str) – The old column to be renamed.

  • new_name (str) – The new name for the column.

Raises:

NotImplementedError – If self.allow_column_rename is false.

Return type:

None

schema_exists(schema_name)[source]

Determine if the target database schema already exists.

Parameters:

schema_name (str) – The target database schema name.

Returns:

True if the database schema exists, False if not.

Return type:

bool

serialize_json(obj)[source]

Serialize an object to a JSON string.

Target connectors may override this method to provide custom serialization logic for JSON types.

Parameters:

obj (object) – The object to serialize.

Returns:

The JSON string.

Return type:

str

Added in version 0.31.0.

table_exists(full_table_name)[source]

Determine if the target table already exists.

Parameters:

full_table_name (str) – the target table name.

Returns:

True if table exists, False if not, None if unsure or undetectable.

Return type:

bool

static to_jsonschema_type(sql_type)[source]

Return a JSON Schema representation of the provided type.

By default will call typing.to_jsonschema_type() for strings and SQLAlchemy types.

Developers may override this method to accept additional input argument types, to support non-standard types, or to provide custom typing logic.

Parameters:

sql_type (str | TypeEngine | type[TypeEngine] | Any) – The string representation of the SQL type, a SQLAlchemy TypeEngine class or object, or a custom-specified object.

Raises:

ValueError – If the type received could not be translated to jsonschema.

Returns:

The JSON Schema representation of the provided type.

Return type:

dict

static to_sql_type(jsonschema_type)[source]

Return a JSON Schema representation of the provided type.

By default will call typing.to_sql_type().

Developers may override this method to accept additional input argument types, to support non-standard types, or to provide custom typing logic. If overriding this method, developers should call the default implementation from the base class for all unhandled cases.

Parameters:

jsonschema_type (dict) – The JSON Schema representation of the source type.

Returns:

The SQLAlchemy type representation of the data type.

Return type:

TypeEngine

static update_collation(column_type, collation)[source]

Sets column collation if column type has a collation attribute.

Parameters:
  • column_type (TypeEngine) – Column SQLAlchemy type.

  • collation (str | None) – The colation

Return type:

None

property config: dict[source]

If set, provides access to the tap or target config.

Returns:

The settings as a dict.

property connection: Connection[source]

(DEPRECATED) Return or set the SQLAlchemy connection object.

Do not use the SQLConnector’s connection directly. Instead, if you need to execute something that isn’t available on the connector currently, make a child class and add a method on that connector.

Returns:

The active SQLAlchemy connection object.

property logger: Logger[source]

Get logger.

Returns:

Plugin logger.

property sqlalchemy_url: str[source]

Return the SQLAlchemy URL string.

Returns:

The URL as a string.