singer_sdk.RESTStream#

class singer_sdk.RESTStream[source]#

Abstract base class for REST API streams.

__init__(tap, name=None, schema=None, path=None)[source]#

Initialize the REST stream.

Parameters:
  • tap (Tap) – Singer Tap this stream belongs to.

  • schema (dict[str, t.Any] | Schema | None) – JSON schema for records in this stream.

  • name (str | None) – Name of this stream.

  • path (str | None) – URL path for this entity stream.

Return type:

None

backoff_handler(details)[source]#

Adds additional behaviour prior to retry.

By default will log out backoff details, developers can override to extend or change this behaviour.

Parameters:

details (Details) – backoff invocation details https://github.com/litl/backoff#event-handlers

Return type:

None

backoff_jitter(value)[source]#

Amount of jitter to add.

For more information see https://github.com/litl/backoff/blob/master/backoff/_jitter.py

We chose to default to random_jitter instead of full_jitter as we keep some level of default jitter to be “nice” to downstream APIs but it’s still relatively close to the default value that’s passed in to make tap developers’ life easier.

Parameters:

value (float) – Base amount to wait in seconds

Returns:

Time in seconds to wait until the next request.

Return type:

float

backoff_max_tries()[source]#

The number of attempts before giving up when retrying requests.

Returns:

Number of max retries.

Return type:

int

backoff_runtime(*, value)[source]#

Optional backoff wait generator that can replace the default backoff.expo.

It is based on parsing the thrown exception of the decorated method, making it possible for response values to be in scope.

You may want to review backoff_jitter() if you’re overriding this function.

Parameters:

value (Callable[[Any], int]) – a callable which takes as input the decorated function’s thrown exception and determines how long to wait.

Yields:

The thrown exception

Return type:

Generator[int, None, None]

backoff_wait_generator()[source]#

The wait generator used by the backoff decorator on request failure.

See for options: https://github.com/litl/backoff/blob/master/backoff/_wait_gen.py

And see for examples: Code Samples

Returns:

The wait generator

Return type:

Generator[float, None, None]

build_prepared_request(*args, **kwargs)[source]#

Build a generic but authenticated request.

Uses the authenticator instance to mutate the request with authentication.

Parameters:
Returns:

A requests.PreparedRequest object.

Return type:

PreparedRequest

calculate_sync_cost(request, response, context)[source]#

Calculate the cost of the last API call made.

This method can optionally be implemented in streams to calculate the costs (in arbitrary units to be defined by the tap developer) associated with a single API/network call. The request and response objects are available in the callback, as well as the context.

The method returns a dict where the keys are arbitrary cost dimensions, and the values the cost along each dimension for this one call. For instance: { “rest”: 0, “graphql”: 42 } for a call to github’s graphql API. All keys should be present in the dict.

This method can be overridden by tap streams. By default it won’t do anything.

Parameters:
Returns:

A dict of accumulated costs whose keys are the “cost domains”.

Return type:

dict[str, int]

get_new_paginator()[source]#

Get a fresh paginator for this API endpoint.

Returns:

A paginator instance.

Return type:

BaseAPIPaginator

get_records(context)[source]#

Return a generator of record-type dictionary objects.

Each record emitted should be a dictionary of property names to their values.

Parameters:

context (dict | None) – Stream partition or context dictionary.

Yields:

One item per (possibly processed) record in the API.

Return type:

Iterable[dict[str, Any]]

get_url(context)[source]#

Get stream entity URL.

Developers override this method to perform dynamic URL generation.

Parameters:

context (dict | None) – Stream partition or context dictionary.

Returns:

A URL, optionally targeted to a specific partition or context.

Return type:

str

get_url_params(context, next_page_token)[source]#

Return a dictionary or string of URL query parameters.

If paging is supported, developers may override with specific paging logic.

If your source needs special handling and, for example, parentheses should not be encoded, you can return a string constructed with urllib.parse.urlencode():

from urllib.parse import urlencode


class MyStream(RESTStream):
    def get_url_params(self, context, next_page_token):
        params = {"key": "(a,b,c)"}
        return urlencode(params, safe="()")
Parameters:
  • context (dict | None) – Stream partition or context dictionary.

  • next_page_token (_TToken | None) – Token, page number or any request argument to request the next page of data.

Returns:

Dictionary or encoded string with URL query parameters to use in the

request.

Return type:

dict[str, Any] | str

parse_response(response)[source]#

Parse the response and return an iterator of result records.

Parameters:

response (Response) – A raw requests.Response

Yields:

One item for every item found in the response.

Return type:

Iterable[dict]

prepare_request(context, next_page_token)[source]#

Prepare a request object for this stream.

If partitioning is supported, the context object will contain the partition definitions. Pagination information can be parsed from next_page_token if next_page_token is not None.

Parameters:
  • context (dict | None) – Stream partition or context dictionary.

  • next_page_token (_TToken | None) – Token, page number or any request argument to request the next page of data.

Returns:

Build a request with the stream’s URL, path, query parameters, HTTP headers and authenticator.

Return type:

PreparedRequest

prepare_request_payload(context, next_page_token)[source]#

Prepare the data payload for the REST API request.

By default, no payload will be sent (return None).

Developers may override this method if the API requires a custom payload along with the request. (This is generally not required for APIs which use the HTTP ‘GET’ method.)

Parameters:
  • context (dict | None) – Stream partition or context dictionary.

  • next_page_token (_TToken | None) – Token, page number or any request argument to request the next page of data.

Return type:

dict | None

request_decorator(func)[source]#

Instantiate a decorator for handling request failures.

Uses a wait generator defined in backoff_wait_generator to determine backoff behaviour. Try limit is defined in backoff_max_tries, and will trigger the event defined in backoff_handler before retrying. Developers may override one or all of these methods to provide custom backoff or retry handling.

Parameters:

func (Callable) – Function to decorate.

Returns:

A decorated method.

Return type:

Callable

request_records(context)[source]#

Request records from REST endpoint(s), returning response records.

If pagination is detected, pages will be recursed automatically.

Parameters:

context (dict | None) – Stream partition or context dictionary.

Yields:

An item for every record in the response.

Return type:

Iterable[dict]

response_error_message(response)[source]#

Build error message for invalid http statuses.

WARNING - Override this method when the URL path may contain secrets or PII

Parameters:

response (Response) – A requests.Response object.

Returns:

The error message

Return type:

str

update_sync_costs(request, response, context)[source]#

Update internal calculation of Sync costs.

Parameters:
Returns:

A dict of costs (for the single request) whose keys are the “cost domains”. See calculate_sync_cost for details.

Return type:

dict[str, int]

validate_response(response)[source]#

Validate HTTP response.

Checks for error status codes and whether they are fatal or retriable.

In case an error is deemed transient and can be safely retried, then this method should raise an singer_sdk.exceptions.RetriableAPIError. By default this applies to 5xx error codes, along with values set in: extra_retry_statuses

In case an error is unrecoverable raises a singer_sdk.exceptions.FatalAPIError. By default, this applies to 4xx errors, excluding values found in: extra_retry_statuses

Tap developers are encouraged to override this method if their APIs use HTTP status codes in non-conventional ways, or if they communicate errors differently (e.g. in the response body).

../_images/200.png
Parameters:

response (Response) – A requests.Response object.

Raises:
Return type:

None

property authenticator: Callable[[PreparedRequest], PreparedRequest][source]#

Return or set the authenticator for managing HTTP auth headers.

If an authenticator is not specified, REST-based taps will simply pass http_headers as defined in the stream class.

Returns:

Authenticator instance that will be used to authenticate all outgoing requests.

extra_retry_statuses: t.Sequence[int] = [HTTPStatus.TOO_MANY_REQUESTS][source]#

Response code reference for rate limit retries

property http_headers: dict[source]#

Return headers dict to be used for HTTP requests.

If an authenticator is also specified, the authenticator’s headers will be combined with http_headers when making HTTP requests.

Returns:

Dictionary of HTTP headers to use as a base for every request.

next_page_token_jsonpath: str | None = None[source]#

Optional JSONPath expression to extract a pagination token from the API response. Example: “$.next_page”

records_jsonpath: str = '$[*]'[source]#

JSONPath expression to extract records from the API response.

property requests_session: Session[source]#

Get requests session.

Returns:

The requests.Session object for HTTP requests.

rest_method = 'GET'[source]#

HTTP method to use for requests. Defaults to “GET”.

property timeout: int[source]#

Return the request timeout limit in seconds.

The default timeout is 300 seconds, or as defined by DEFAULT_REQUEST_TIMEOUT.

Returns:

The request timeout limit as number of seconds.

abstract property url_base: str[source]#

Return the base url, e.g. https://api.mysite.com/v3/.