singer_sdk.SQLSink
- class singer_sdk.SQLSink
SQL-type sink type.
- __init__(target: PluginBase, stream_name: str, schema: Dict, key_properties: Optional[List[str]], connector: Optional[SQLConnector] = None) None
Initialize SQL Sink.
- Parameters
target – The target object.
stream_name – The source tap’s stream name.
schema – The JSON Schema definition.
key_properties – The primary key columns.
connector – Optional connector to reuse.
- activate_version(new_version: int) None
Bump the active version of the target table.
- Parameters
new_version – The version number to activate.
- bulk_insert_records(full_table_name: str, schema: dict, records: Iterable[Dict[str, Any]]) Optional[int]
Bulk insert records to an existing destination table.
The default implementation uses a generic SQLAlchemy bulk insert operation. This method may optionally be overridden by developers in order to provide faster, native bulk uploads.
- Parameters
full_table_name – the target table name.
schema – the JSON schema for the new table, to be used when inferring column names.
records – the input records.
- Returns
True if table exists, False if not, None if unsure or undetectable.
- conform_name(name: str, object_type: Optional[str] = None) str
Conform a stream property name to one suitable for the target system.
Transforms names to snake case by default, applicable to most common DBMSs’. Developers may override this method to apply custom transformations to database/schema/table/column names.
- Parameters
name – Property name.
object_type – One of
database
,schema
,table
orcolumn
.
- Returns
The name transformed to snake case.
- conform_record(record: dict) dict
Return record dictionary with property names conformed.
- Parameters
record – Dictionary representing a single record.
- Returns
New record dictionary with conformed column names.
- conform_schema(schema: dict) dict
Return schema dictionary with property names conformed.
- Parameters
schema – JSON schema dictionary.
- Returns
A schema dictionary with the property names conformed.
- property connection: Connection
Get or set the SQLAlchemy connection for this sink.
- Returns
A connection object.
- property connector: SQLConnector
The connector object.
- Returns
The connector object.
- property database_name: Optional[str]
Return the DB name or None if using names with no database part.
- Returns
The target database name.
- property full_schema_name: str
Return the fully qualified schema name.
- Returns
The fully qualified schema name.
- property full_table_name: str
Return the fully qualified table name.
- Returns
The fully qualified table name.
- generate_insert_statement(full_table_name: str, schema: dict) Union[str, Executable]
Generate an insert statement for the given records.
- Parameters
full_table_name – the target table name.
schema – the JSON schema for the new table.
- Returns
An insert statement.
- property key_properties: List[str]
Return key properties, conformed to target system naming requirements.
- Returns
A list of key properties, conformed with self.conform_name()
- merge_upsert_from_table(target_table_name: str, from_table_name: str, join_keys: List[str]) Optional[int]
Merge upsert data from one table to another.
- Parameters
target_table_name – The destination table name.
from_table_name – The source table name.
join_keys – The merge upsert keys, or None to append.
- Returns
The number of records copied, if detectable, or None if the API does not report number of records affected/inserted.
- Raises
NotImplementedError – if the merge upsert capability does not exist or is undefined.
- process_batch(context: dict) None
Process a batch with the given batch context.
Writes a batch to the SQL target. Developers may override this method in order to provide a more efficient upload/upsert process.
- Parameters
context – Stream partition or context dictionary.
- property schema_name: Optional[str]
Return the schema name or None if using names with no schema part.
- Returns
The target schema name.
- setup() None
Set up Sink.
This method is called on Sink creation, and creates the required Schema and Table entities in the target database.
- property table_name: str
Return the table name, with no schema or database part.
- Returns
The target table name.