singer_sdk.SQLTap#

class singer_sdk.SQLTap#

Bases: Tap

A specialized Tap for extracting from SQL streams.

__init__(config: dict | PurePath | str | list[PurePath | str] | None = None, catalog: PurePath | str | dict | None = None, state: PurePath | str | dict | None = None, parse_env_config: bool = False, validate_config: bool = True) None#

Initialize the SQL tap.

The SQLTap initializer additionally creates a cache variable for _catalog_dict.

Parameters:
  • config – Tap configuration. Can be a dictionary, a single path to a configuration file, or a list of paths to multiple configuration files.

  • catalog – Tap catalog. Can be a dictionary or a path to the catalog file.

  • state – Tap state. Can be dictionary or a path to the state file.

  • parse_env_config – Whether to look for configuration values in environment variables.

  • validate_config – True to require validation of config settings.

classmethod append_builtin_config(config_jsonschema: dict) None#

Appends built-in config to config_jsonschema if not already set.

To customize or disable this behavior, developers may either override this class method or override the capabilities property to disabled any unwanted built-in capabilities.

For all except very advanced use cases, we recommend leaving these implementations “as-is”, since this provides the most choice to users and is the most “future proof” in terms of taking advantage of built-in capabilities which may be added in the future.

Parameters:

config_jsonschema – [description]

property catalog: Catalog#

Get the tap’s working catalog.

Returns:

A Singer catalog object.

property catalog_dict: dict#

Get catalog dictionary.

Returns:

The tap’s catalog as a dict

property catalog_json_text: str#

Get catalog JSON.

Returns:

The tap’s catalog as formatted JSON text.

cli = <Command cli>#
property config: Mapping[str, Any]#

Get config.

Returns:

A frozen (read-only) config dictionary map.

discover_streams() list[Stream]#

Initialize all available streams and return them as a list.

Returns:

List of discovered Stream objects.

property input_catalog: Catalog | None#

Get the catalog passed to the tap.

Returns:

Catalog dictionary input, or None if not provided.

load_state(state: dict[str, Any]) None#

Merge or initialize stream state with the provided state dictionary input.

Override this method to perform validation and backwards-compatibility patches on self.state. If overriding, we recommend first running super().load_state(state) to ensure compatibility with the SDK.

Parameters:

state – Initialize the tap’s state with this value.

Raises:

ValueError – If the tap’s own state is None, meaning it has not been initialized.

load_streams() list[Stream]#

Load streams from discovery and initialize DAG.

Return the output of self.discover_streams() to enumerate discovered streams.

Returns:

A list of discovered streams, ordered by name.

classmethod print_about(output_format: str | None = None) None#

Print capabilities and other tap metadata.

Parameters:

output_format – Render option for the plugin information.

classmethod print_version(print_fn: Callable[[Any], None] = <built-in function print>) None#

Print help text for the tap.

Parameters:

print_fn – A function to use to display the plugin version. Defaults to print.

run_connection_test() bool#

Run connection test, aborting each stream after 1 record.

Returns:

True if the test succeeded.

run_discovery() str#

Write the catalog json to STDOUT and return as a string.

Returns:

The catalog as a string of JSON.

run_sync_dry_run(dry_run_record_limit: int | None = 1, streams: Iterable[Stream] | None = None) bool#

Run connection test.

Exceptions of type MaxRecordsLimitException and PartialSyncSuccessException will be ignored.

Parameters:
  • dry_run_record_limit – The max number of records to sync per stream object.

  • streams – The streams to test. If omitted, all streams will be tested.

Returns:

True if the test succeeded.

property state: dict#

Get tap state.

Returns:

The tap’s state dictionary

Raises:

RuntimeError – If state has not been initialized.

property streams: dict[str, Stream]#

Get streams discovered or catalogued for this tap.

Results will be cached after first execution.

Returns:

A mapping of names to streams, using discovery or a provided catalog.

sync_all() None#

Sync all streams.

write_schemas() None#

Write a SCHEMA message for all known streams to STDOUT.