SDK Code Samples#

Below you will find a collection of code samples which can be used for inspiration.

Project Samples#

Below are full project samples, contributed by members in the community. Use these for inspiration or to get more information on what an SDK-based tap or target will look like.

To add your project to this list, please submit an issue.

Reusable Code Snippets#

These are code samples taken from other projects. Use these as a reference if you get stuck.

A simple Tap class definition with two streams#

class TapCountries(Tap):
    """Sample tap for Countries GraphQL API. This tap has no
    config options and does not require authentication.
    """
    name = "tap-countries"
    config_jsonschema = th.PropertiesList([]).to_dict()

    def discover_streams(self) -> List[Stream]:
        """Return a list containing the two stream types."""
        return [
            CountriesStream(tap=self),
            ContinentsStream(tap=self),
        ]

Define a simple GraphQL-based stream with schema defined in a file#

class ContinentsStream(GraphQLStream):
    """Continents stream from the Countries API."""

    name = "continents"
    primary_keys = ["code"]
    replication_key = None  # Incremental bookmarks not needed

    # Read JSON Schema definition from a text file:
    schema_filepath = SCHEMAS_DIR / "continents.json"

    # GraphQL API endpoint and query text:
    url_base = "https://countries.trevorblades.com/"
    query = """
        continents {
            code
            name
        }
        """

Define a REST-based stream with a JSONPath expression#

class LOTRCharactersStream(RESTStream):
    """Characters stream from the Lord of the Rings 'The One' API."""

    # Base REST API configuration
    url_base = "https://the-one-api.dev/v2"
    primary_keys = ["_id"]

    # Endpoint configuration
    path = "/character"
    name = "characters"
    records_jsonpath = "$.docs[*]"

    @property
    def authenticator(self):
        return SimpleAuthenticator(
            stream=self,
            auth_headers={
                "Authorization": f"Bearer {self.config.get('api_key')}",
            },
        )

Use a JSONPath expression to extract the next page URL from a HATEOAS response#

class MyStream(RESTStream):
    """A custom stream."""

    # Gets the href property from the links item where rel="next"
    next_page_token_jsonpath = "$.links[?(@.rel=='next')].href"

Dynamically discovering schema for a stream#

Here is an example which parses schema from a CSV file:

FAKECSV = """
Header1,Header2,Header3
val1,val2,val3
val1,val2,val3
val1,val2,val3
"""

@property
class ParquetStream(Stream):
    def schema(self):
        """Dynamically detect the json schema for the stream.
        This is evaluated prior to any records being retrieved.
        """
        properties: List[th.Property] = []
        for header in FAKECSV.split("\n")[0].split(","):
            # Assume string type for all fields
            properties.append(th.Property(header, th.StringType()))
        return th.PropertiesList(*properties).to_dict()

Here is another example from the Parquet tap. This sample uses a custom get_jsonschema_type() function to return the data type.

class ParquetStream(Stream):
    """Stream class for Parquet streams."""

    #...

    @property
    def schema(self) -> dict:
        """Dynamically detect the json schema for the stream.
        This is evaluated prior to any records being retrieved.
        """
        properties: List[th.Property] = []
        # Get a schema object using the parquet and pyarrow libraries
        parquet_schema = pq.ParquetFile(self.filepath).schema_arrow

        # Loop through each column in the schema object
        for i in range(len(parquet_schema.names)):
            # Get the column name
            name = parquet_schema.names[i]
            # Translate from the Parquet type to a JSON Schema type
            dtype = get_jsonschema_type(str(parquet_schema.types[i]))

            # Add the new property to our list
            properties.append(th.Property(name, dtype))

        # Return the list as a JSON Schema dictionary object
        return th.PropertiesList(*properties).to_dict()

Initialize a collection of tap streams with differing types#

class TapCountries(Tap):
    # ...
    def discover_streams(self) -> List[Stream]:
        """Return a list containing one each of the two stream types."""
        return [
            CountriesStream(tap=self),
            ContinentsStream(tap=self),
        ]

Or equivalently:


# Declare list of types here at the top of the file
STREAM_TYPES = [
    CountriesStream,
    ContinentsStream,
]

class TapCountries(Tap):
    # ...
    def discover_streams(self) -> List[Stream]:
        """Return a list with one each of all defined stream types."""
        return [
            stream_type(tap=self)
            for stream_type in STREAM_TYPES
        ]

Run the standard built-in tap tests#

# Import the tests
from singer_sdk.testing import get_standard_tap_tests

# Import our tap class
from tap_parquet.tap import TapParquet

SAMPLE_CONFIG = {
    # ...
}

def test_sdk_standard_tap_tests():
    """Run the built-in tap tests from the SDK."""
    tests = get_standard_tap_tests(TapParquet, config=SAMPLE_CONFIG)
    for test in tests:
        test()

Make all streams reuse the same authenticator instance#

from singer_sdk.authenticators import OAuthAuthenticator, SingletonMeta
from singer_sdk.streams import RESTStream

class SingletonAuthenticator(OAuthAuthenticator, metaclass=SingletonMeta):
    """A singleton authenticator."""

class SingletonAuthStream(RESTStream):
    """A stream with singleton authenticator."""

    @property
    def authenticator(self) -> SingletonAuthenticator:
        """Stream authenticator."""
        return SingletonAuthenticator(stream=self)

Make a stream reuse the same authenticator instance for all requests#

from functools import cached_property

from singer_sdk.authenticators import APIAuthenticatorBase
from singer_sdk.streams import RESTStream

class CachedAuthStream(RESTStream):
    """A stream with singleton authenticator."""

    @cached_property
    def authenticator(self) -> APIAuthenticatorBase:
        """Stream authenticator."""
        return APIAuthenticatorBase(stream=self)

Use one of requests’s built-in authenticators#

from requests.auth import HTTPDigestAuth
from singer_sdk.streams import RESTStream

class DigestAuthStream(RESTStream):
    """A stream with digest authentication."""

    @property
    def authenticator(self) -> HTTPDigestAuth:
        """Stream authenticator."""
        return HTTPDigestAuth(
            username=self.config["username"],
            password=self.config["password"],
        )

HTTPBasicAuth and HTTPProxyAuth are also available in requests.auth. In addition to requests.auth classes, the community has published a few packages with custom authenticator classes, which are compatible with the SDK. For example:

  • requests-aws4auth: AWS v4 authentication

  • requests_auth: A collection of authenticators for various services and protocols including Azure, Okta and NTLM.

Custom response validation#

Some APIs deviate from HTTP status codes to report failures. For those cases, you can override RESTStream.validate_response() and raise FatalAPIError if an unrecoverable error is detected. If the API also has transient errors, either client-side like rate limits, or server-side like temporary 5xx, you can raise RetriableAPIError and the request will be retried with back-off:

from enum import Enum
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.streams.rest import RESTStream

class CustomResponseValidationStream(RESTStream):
    """Stream with non-conventional error response."""

    url_base = "https://badapi.test"
    name = "non_conventional"
    schema = {"type": "object", "properties": {}}
    path = "/dummy

    class StatusMessage(str, Enum):
        """Possible status messages."""

        OK = "OK"
        ERROR = "ERROR"
        UNAVAILABLE = "UNAVAILABLE"

    def validate_response(self, response):
        # Still catch error status codes
        super().validate_response(response)

        data = response.json()
        if data["status"] == self.StatusMessage.ERROR:
            raise FatalAPIError("Error message found :(")
        if data["status"] == self.StatusMessage.UNAVAILABLE:
            raise RetriableAPIError("API is unavailable")

Custom Backoff#

Custom backoff and retry behaviour can be added by overriding the methods:

For example, to use a constant retry:

def backoff_wait_generator() -> Callable[..., Generator[int, Any, None]]:
    return backoff.constant(interval=10)

To utilise a response header as a wait value you can use backoff_runtime, and pass a method that returns a wait value:

Note: By default jitter makes this function wait a bit longer than the value provided. To disable jitter override backoff_jitter. In sdk versions <=0.21.0 the default jitter function will make the function below not work as you would expect without disabling jitter, (here for more information) to disable jitter override the request_decorator and pass jitter=None to the backoff.on_exception function.

def backoff_wait_generator() -> Callable[..., Generator[int, Any, None]]:
    def _backoff_from_headers(retriable_api_error):
        response_headers = retriable_api_error.response.headers
        return int(response_headers.get("Retry-After", 0))

    return self.backoff_runtime(value=_backoff_from_headers)

Additional Resources#

More links, resources, and example solutions are available from community members in the #singer-tap-development and #singer-target-development channels on Meltano Slack.