Code Samples

Below you will find a collection of code samples which can be used for inspiration.

Project Samples

Below are full project samples, contributed by members in the community. Use these for inspiration or to get more information on what an SDK-based tap or target will look like.

To add your project to this list, please submit an issue.

Reusable Code Snippets

These are code samples taken from other projects. Use these as a reference if you get stuck.

A simple Tap class definition with two streams

class TapCountries(Tap):
    """Sample tap for Countries GraphQL API. This tap has no
    config options and does not require authentication.
    """
    name = "tap-countries"
    config_jsonschema = PropertiesList([]).to_dict()

    def discover_streams(self) -> List[Stream]:
        """Return a list containing the two stream types."""
        return [
            CountriesStream(tap=self),
            ContinentsStream(tap=self),
        ]

Define a simple GraphQL-based stream with schema defined in a file

class ContinentsStream(GraphQLStream):
    """Continents stream from the Countries API."""

    name = "continents"
    primary_keys = ["code"]
    replication_key = None  # Incremental bookmarks not needed

    # Read JSON Schema definition from a text file:
    schema_filepath = SCHEMAS_DIR / "continents.json"

    # GraphQL API endpoint and query text:
    url_base = "https://countries.trevorblades.com/"
    query = """
        continents {
            code
            name
        }
        """

Define a REST-based stream with a JSONPath expression

class LOTRCharactersStream(RESTStream):
    """Characters stream from the Lord of the Rings 'The One' API."""

    # Base REST API configuration
    url_base = "https://the-one-api.dev/v2"
    primary_keys = ["_id"]

    # Endpoint configuration
    path = "/character"
    name = "characters"
    records_jsonpath = "$.docs[*]"

    @property
    def authenticator(self):
        return SimpleAuthenticator(
            stream=self,
            auth_headers={
                "Authorization": f"Bearer {self.config.get('api_key')}",
            },
        )

Dynamically discovering schema for a stream

Here is an example which parses schema from a CSV file:

FAKECSV = """
Header1,Header2,Header3
val1,val2,val3
val1,val2,val3
val1,val2,val3
"""

@property
class ParquetStream(Stream):
    def schema(self):
        """Dynamically detect the json schema for the stream.
        This is evaluated prior to any records being retrieved.
        """
        properties: List[Property] = []
        for header in FAKECSV.split("\n")[0].split(",")
            # Assume string type for all fields
            properties.add(header, StringType())
        return PropertiesList(*properties).to_dict()

Here is another example from the Parquet tap. This sample uses a custom get_jsonschema_type() function to return the data type.

class ParquetStream(Stream):
    """Stream class for Parquet streams."""

    #...

    @property
    def schema(self) -> dict:
        """Dynamically detect the json schema for the stream.
        This is evaluated prior to any records being retrieved.
        """
        properties: List[Property] = []
        # Get a schema object using the parquet and pyarrow libraries
        parquet_schema = pq.ParquetFile(self.filepath).schema_arrow

        # Loop through each column in the schema object
        for i in range(len(parquet_schema.names)):
            # Get the column name
            name = parquet_schema.names[i]
            # Translate from the Parquet type to a JSON Schema type
            dtype = get_jsonschema_type(str(parquet_schema.types[i]))

            # Add the new property to our list
            properties.append(Property(name, dtype))

        # Return the list as a JSON Schema dictionary object
        return PropertiesList(*properties).to_dict()

Initialize a collection of tap streams with differing types

class TapCountries(Tap):
    # ...
    def discover_streams(self) -> List[Stream]:
        """Return a list containing one each of the two stream types."""
        return [
            CountriesStream(tap=self),
            ContinentsStream(tap=self),
        ]

Or equivalently:


# Declare list of types here at the top of the file
STREAM_TYPES = [
    CountriesStream,
    ContinentsStream,
]

class TapCountries(Tap):
    # ...
    def discover_streams(self) -> List[Stream]:
        """Return a list with one each of all defined stream types."""
        return [
            stream_type(tap=self)
            for stream_type in STREAM_TYPES
        ]

Run the standard built-in tap tests

# Import the tests
from singer_sdk.testing import get_standard_tap_tests

# Import our tap class
from tap_parquet.tap import TapParquet

SAMPLE_CONFIG = {
    # ...
}

def test_sdk_standard_tap_tests():
    """Run the built-in tap tests from the SDK."""
    tests = get_standard_tap_tests(TapParquet, config=SAMPLE_CONFIG)
    for test in tests:
        test()

Make all streams reuse the same authenticator instance

from singer_sdk.authenticators import OAuthAuthenticator, SingletonMeta
from singer_sdk.streams import RESTStream

class SingletonAuthenticator(OAuthAuthenticator, metaclass=SingletonMeta):
    """A singleton authenticator."""

class SingletonAuthStream(RESTStream):
    """A stream with singleton authenticator."""

    @property
    def authenticator(self) -> SingletonAuthenticator:
        """Stream authenticator."""
        return SingletonAuthenticator(stream=self)

Make a stream reuse the same authenticator instance for all requests

from memoization import cached

from singer_sdk.authenticators import APIAuthenticatorBase
from singer_sdk.streams import RESTStream

class CachedAuthStream(RESTStream):
    """A stream with singleton authenticator."""

    @property
    @cached
    def authenticator(self) -> APIAuthenticatorBase:
        """Stream authenticator."""
        return APIAuthenticatorBase(stream=self)

Additional Resources

More links, resources, and example solutions are available from community members in the #singer-tap-development and #singer-target-development channels on Meltano Slack.