singer_sdk.Stream¶
- class singer_sdk.Stream[source]¶
Abstract base class for tap streams.
- Variables:
context – Stream partition or context dictionary.
Added in version 0.39.0: The
context
attribute.- __init__(tap, schema=None, name=None)[source]¶
Init tap stream.
- Parameters:
- Raises:
ValueError – TODO
FileNotFoundError – TODO
- Return type:
None
- apply_catalog(catalog)[source]¶
Apply a catalog dict, updating any settings overridden within the catalog.
Developers may override this method in order to introduce advanced catalog parsing, or to explicitly fail on advanced catalog customizations which are not supported by the tap.
- Parameters:
catalog (Catalog) – Catalog object passed to the tap. Defines schema, primary and replication keys, as well as selection metadata.
- Return type:
None
- compare_start_date(value, start_date_value)[source]¶
Compare a bookmark value to a start date and return the most recent value.
If the replication key is a datetime-formatted string, this method will parse the value and compare it to the start date. Otherwise, the bookmark value is returned.
If the tap uses a non-datetime replication key (e.g. an UNIX timestamp), the developer is encouraged to override this method to provide custom logic for comparing the bookmark value to the start date.
- finalize_state_progress_markers(state=None)[source]¶
Reset progress markers and emit state message if necessary.
This method is internal to the SDK and should not need to be overridden.
- Parameters:
state (dict | None) – State object to promote progress markers with.
- Return type:
None
- generate_child_contexts(record, context)[source]¶
Generate child contexts.
- Parameters:
record (types.Record) – Individual record in the stream.
context (types.Context | None) – Stream partition or context dictionary.
- Yields:
A child context for each child stream.
- Return type:
t.Iterable[types.Context | None]
- get_batch_config(config)[source]¶
Return the batch config for this stream.
- Parameters:
config (Mapping) – Tap configuration dictionary.
- Returns:
Batch config for this stream.
- Return type:
BatchConfig | None
- get_batches(batch_config, context=None)[source]¶
Batch generator function.
Developers are encouraged to override this method to customize batching behavior for databases, bulk APIs, etc.
- get_child_context(record, context)[source]¶
Return a child context object from the record and optional provided context.
By default, will return context if provided and otherwise the record dict.
Developers may override this behavior to send specific information to child streams for context.
Return
None
if no child streams should be synced, for example if the parent record was deleted and the child records can no longer be synced.- Parameters:
record (types.Record) – Individual record in the stream.
context (types.Context | None) – Stream partition or context dictionary.
- Returns:
A dictionary with context values for a child stream, or None if no child streams should be synced.
- Raises:
NotImplementedError – If the stream has children but this method is not overridden.
- Return type:
types.Context | None
- get_context_state(context)[source]¶
Return a writable state dict for the given context.
Gives a partitioned context state if applicable; else returns stream state. A blank state will be created in none exists.
This method is internal to the SDK and should not need to be overridden. Developers may access this property but this is not recommended except in advanced use cases. Instead, developers should access the latest stream replication key values using
get_starting_timestamp()
for timestamp keys, orget_starting_replication_key_value()
for non-timestamp keys.Partition level may be overridden by
state_partitioning_keys
if set.- Parameters:
context (types.Context | None) – Stream partition or context dictionary.
- Returns:
A partitioned context state if applicable; else returns stream state. A blank state will be created in none exists.
- Return type:
- abstract get_records(context)[source]¶
Abstract record generator function. Must be overridden by the child class.
Each record emitted should be a dictionary of property names to their values. Returns either a record dict or a tuple: (record_dict, child_context)
A method which should retrieve data from the source and return records incrementally using the python yield operator.
Only custom stream types need to define this method. REST and GraphQL streams should instead use the class-specific methods for REST or GraphQL, respectively.
This method takes an optional context argument, which can be safely ignored unless the stream is a child stream or requires partitioning. More info: Stream Partitioning.
Parent streams can optionally return a tuple, in which case the second item in the tuple being a child_context dictionary for the stream’s context.
If the child context object in the tuple is
None
, the child streams will be skipped. This is useful for cases where the parent record was deleted and the child records can no longer be synced.More info: Parent-Child Streams
- get_replication_key_signpost(context)[source]¶
Get the replication signpost.
For timestamp-based replication keys, this defaults to utc_now(). For non-timestamp replication keys, default to None. For consistency in subsequent calls, the value will be frozen (cached) at its initially called state, per partition argument if applicable.
Developers may optionally override this method in advanced use cases such as unsorted incremental streams or complex hierarchical stream scenarios. For more info: Stream State
- Parameters:
context (types.Context | None) – Stream partition or context dictionary.
- Returns:
Max allowable bookmark value for this stream’s replication key.
- Return type:
datetime.datetime | t.Any | None
- get_starting_replication_key_value(context)[source]¶
Get starting replication key.
Will return the value of the stream’s replication key when –state is passed. If no prior state exists, will return None.
Developers should use this method to seed incremental processing for non-datetime replication keys. For datetime and date replication keys, use
get_starting_timestamp()
- Parameters:
context (types.Context | None) – Stream partition or context dictionary.
- Returns:
Starting replication value.
- Return type:
t.Any | None
Note
This method requires
replication_key
to be set to a non-null value, indicating the stream should be synced incrementally.
- get_starting_timestamp(context)[source]¶
Get starting replication timestamp.
Will return the value of the stream’s replication key when –state is passed. If no state exists, will return start_date if set, or None if neither the stream state nor start_date is set.
Developers should use this method to seed incremental processing for date and datetime replication keys. For non-datetime replication keys, use
get_starting_replication_key_value()
Note
This method requires
replication_key
to be set to a non-null value, indicating the stream should be synced incrementally.- Parameters:
context (types.Context | None) – Stream partition or context dictionary.
- Returns:
start_date from config, or state value if using timestamp replication.
- Raises:
ValueError – If the replication value is not a valid timestamp.
- Return type:
datetime.datetime | None
- log_sync_costs()[source]¶
Log a summary of Sync costs.
The costs are calculated via calculate_sync_cost. This method can be overridden to log results in a custom format. It is only called once at the end of the life of the stream.
- Return type:
None
- post_process(row, context=None)[source]¶
As needed, append or transform raw data to match expected structure.
Optional. This method gives developers an opportunity to “clean up” the results prior to returning records to the downstream tap - for instance: cleaning, renaming, or appending properties to the raw record result returned from the API.
Developers may also return None from this method to filter out invalid or not-applicable records from the stream.
- Parameters:
row (types.Record) – Individual record in the stream.
context (types.Context | None) – Stream partition or context dictionary.
- Returns:
The resulting record dict, or None if the record should be excluded.
- Return type:
dict | None
- reset_state_progress_markers(state=None)[source]¶
Reset progress markers. If all=True, all state contexts will be set.
This method is internal to the SDK and should not need to be overridden.
- Parameters:
state (dict | None) – State object to promote progress markers with.
- Return type:
None
- final sync(context=None)[source]¶
Sync this stream.
This method is internal to the SDK and should not need to be overridden.
- Parameters:
context (types.Context | None) – Stream partition or context dictionary.
- Return type:
None
- ABORT_AT_RECORD_COUNT: int | None = None[source]¶
If set, raise MaxRecordsLimitException if the limit is exceeded.
- TYPE_CONFORMANCE_LEVEL = 1[source]¶
Type conformance level for this stream.
Field types in the schema are used to convert record field values to the correct type.
Available options are:
TypeConformanceLevel.NONE
: No conformance is performed.TypeConformanceLevel.RECURSIVE
: Conformance is performed recursively through all nested levels in the record.TypeConformanceLevel.ROOT_ONLY
: Conformance is performed only on the root level.
- property check_sorted: bool[source]¶
Check if stream is sorted.
This setting enables additional checks which may trigger InvalidStreamSortException if records are found which are unsorted.
- Returns:
True if sorting is checked. Defaults to True.
- property config: Mapping[str, Any][source]¶
Get stream configuration.
- Returns:
A frozen (read-only) config dictionary map.
- property descendent_streams: list[Stream][source]¶
Get child streams.
- Returns:
A list of all children, recursively.
- property has_selected_descendents: bool[source]¶
Check descendents.
- Returns:
True if any child streams are selected, recursively.
- property is_sorted: bool[source]¶
Expect stream to be sorted.
When True, incremental streams will attempt to resume if unexpectedly interrupted.
- Returns:
True if stream is sorted. Defaults to False.
- property is_timestamp_replication_key: bool[source]¶
Check is replication key is a timestamp.
Developers can override to True in order to force this value, although this should not be required in most use cases since the type can generally be accurately detected from the JSON Schema.
- Returns:
True if the stream uses a timestamp-based replication key.
- Raises:
InvalidReplicationKeyException – If the schema does not contain the replication key.
- property mask: SelectionMask[source]¶
Get a boolean mask for stream and property selection.
- Returns:
A mapping of breadcrumbs to boolean values, representing stream and field selection.
- property metadata: MetadataMapping[source]¶
Get stream metadata.
Metadata attributes (inclusion, selected, etc.) are part of the Singer spec.
Metadata from an input catalog will override standard metadata.
- Returns:
A mapping from property breadcrumbs to metadata objects.
- parent_stream_type: type[Stream] | None = None[source]¶
Parent stream type for this stream. If this stream is a child stream, this should be set to the parent stream class.
- property partitions: list[dict] | None[source]¶
Get stream partitions.
Developers may override this property to provide a default partitions list.
By default, this method returns a list of any partitions which are already defined in state, otherwise None.
- Returns:
A list of partition key dicts (if applicable), otherwise None.
- property primary_keys: Sequence[str] | None[source]¶
Get primary keys.
- Returns:
A list of primary key(s) for the stream.
- property replication_key: str | None[source]¶
Get replication key.
- Returns:
Replication key for the stream.
- property replication_method: str[source]¶
Get replication method.
- Returns:
Replication method to be used for this stream.
- property schema_filepath: Path | Traversable | None[source]¶
Get path to schema file.
- Returns:
Path to a schema file for the stream or None if n/a.
- property selected: bool[source]¶
Check if stream is selected.
- Returns:
True if the stream is selected.
- selected_by_default: bool = True[source]¶
Whether this stream is selected by default in the catalog.
- property state_partitioning_keys: list[str] | None[source]¶
Get state partition keys.
If not set, a default partitioning will be inherited from the stream’s context. If an empty list is set ([]), state will be held in one bookmark per stream.
- Returns:
Partition keys for the stream state bookmarks.
- property stream_maps: list[StreamMap][source]¶
Get stream transformation maps.
The 0th item is the primary stream map. List should not be empty.
- Returns:
A list of one or more map transformations for this stream.
- property stream_state: dict[source]¶
Get writable state.
This method is internal to the SDK and should not need to be overridden. Developers may access this property but this is not recommended except in advanced use cases. Instead, developers should access the latest stream replication key values using
get_starting_timestamp()
for timestamp keys, orget_starting_replication_key_value()
for non-timestamp keys.A blank state entry will be created if one doesn’t already exist.
- Returns:
A writable state dict for this stream.
- property tap_state: dict[source]¶
Return a writeable state dict for the entire tap.
Note: This dictionary is shared (and writable) across all streams.
This method is internal to the SDK and should not need to be overridden. Developers may access this property but this is not recommended except in advanced use cases. Instead, developers should access the latest stream replication key values using
get_starting_timestamp()
for timestamp keys, orget_starting_replication_key_value()
for non-timestamp keys.- Returns:
A writeable state dict for the entire tap.