SDK Code Samples¶
Below you will find a collection of code samples which can be used for inspiration.
Project Samples¶
The following are full project samples, contributed by members of the community:
A REST Stream: MeltanoLabs/tap-pulumi-cloud
A SQL Target: MeltanoLabs/target-postgres
A SQL Tap: MeltanoLabs/tap-postgres
A Cloud Service: MeltanoLabs/tap-cloudwatch
A REST Stream with complex and varied auth options: MeltanoLabs/tap-github
There are many more examples available: go to Meltano Hub and type sdk
in the searchbar to see a list of taps and targets created with the Singer SDK.
To add your project to Meltano Hub, please submit an issue.
Reusable Code Snippets¶
These are code samples taken from other projects. Use these as a reference if you get stuck.
Define a simple GraphQL-based stream with schema defined in a file
Use a JSONPath expression to extract the next page URL from a HATEOAS response
Make a stream reuse the same authenticator instance for all requests
A simple Tap class definition with two streams¶
class TapCountries(Tap):
"""Sample tap for Countries GraphQL API. This tap has no
config options and does not require authentication.
"""
name = "tap-countries"
config_jsonschema = th.PropertiesList([]).to_dict()
def discover_streams(self) -> List[Stream]:
"""Return a list containing the two stream types."""
return [
CountriesStream(tap=self),
ContinentsStream(tap=self),
]
Define a simple GraphQL-based stream with schema defined in a file¶
class ContinentsStream(GraphQLStream):
"""Continents stream from the Countries API."""
name = "continents"
primary_keys = ["code"]
replication_key = None # Incremental bookmarks not needed
# Read JSON Schema definition from a text file:
schema_filepath = SCHEMAS_DIR / "continents.json"
# GraphQL API endpoint and query text:
url_base = "https://countries.trevorblades.com/"
query = """
continents {
code
name
}
"""
Define a REST-based stream with a JSONPath expression¶
class LOTRCharactersStream(RESTStream):
"""Characters stream from the Lord of the Rings 'The One' API."""
# Base REST API configuration
url_base = "https://the-one-api.dev/v2"
primary_keys = ["_id"]
# Endpoint configuration
path = "/character"
name = "characters"
records_jsonpath = "$.docs[*]"
@property
def authenticator(self):
return SimpleAuthenticator(
stream=self,
auth_headers={
"Authorization": f"Bearer {self.config.get('api_key')}",
},
)
Use a JSONPath expression to extract the next page URL from a HATEOAS response¶
class MyStream(RESTStream):
"""A custom stream."""
# Gets the href property from the links item where rel="next"
next_page_token_jsonpath = "$.links[?(@.rel=='next')].href"
Dynamically discovering schema
for a stream¶
Here is an example which parses schema from a CSV file:
FAKECSV = """
Header1,Header2,Header3
val1,val2,val3
val1,val2,val3
val1,val2,val3
"""
@property
class ParquetStream(Stream):
def schema(self):
"""Dynamically detect the json schema for the stream.
This is evaluated prior to any records being retrieved.
"""
properties: List[th.Property] = []
for header in FAKECSV.split("\n")[0].split(","):
# Assume string type for all fields
properties.append(th.Property(header, th.StringType()))
return th.PropertiesList(*properties).to_dict()
Here is another example from the Parquet tap. This sample uses a
custom get_jsonschema_type()
function to return the data type.
class ParquetStream(Stream):
"""Stream class for Parquet streams."""
#...
@property
def schema(self) -> dict:
"""Dynamically detect the json schema for the stream.
This is evaluated prior to any records being retrieved.
"""
properties: List[th.Property] = []
# Get a schema object using the parquet and pyarrow libraries
parquet_schema = pq.ParquetFile(self.filepath).schema_arrow
# Loop through each column in the schema object
for i in range(len(parquet_schema.names)):
# Get the column name
name = parquet_schema.names[i]
# Translate from the Parquet type to a JSON Schema type
dtype = get_jsonschema_type(str(parquet_schema.types[i]))
# Add the new property to our list
properties.append(th.Property(name, dtype))
# Return the list as a JSON Schema dictionary object
return th.PropertiesList(*properties).to_dict()
Initialize a collection of tap streams with differing types¶
class TapCountries(Tap):
# ...
def discover_streams(self) -> List[Stream]:
"""Return a list containing one each of the two stream types."""
return [
CountriesStream(tap=self),
ContinentsStream(tap=self),
]
Or equivalently:
# Declare list of types here at the top of the file
STREAM_TYPES = [
CountriesStream,
ContinentsStream,
]
class TapCountries(Tap):
# ...
def discover_streams(self) -> List[Stream]:
"""Return a list with one each of all defined stream types."""
return [
stream_type(tap=self)
for stream_type in STREAM_TYPES
]
Run the standard built-in tap tests¶
# Import the tests
from singer_sdk.testing import get_standard_tap_tests
# Import our tap class
from tap_parquet.tap import TapParquet
SAMPLE_CONFIG = {
# ...
}
def test_sdk_standard_tap_tests():
"""Run the built-in tap tests from the SDK."""
tests = get_standard_tap_tests(TapParquet, config=SAMPLE_CONFIG)
for test in tests:
test()
Make all streams reuse the same authenticator instance¶
from singer_sdk.authenticators import OAuthAuthenticator, SingletonMeta
from singer_sdk.streams import RESTStream
class SingletonAuthenticator(OAuthAuthenticator, metaclass=SingletonMeta):
"""A singleton authenticator."""
class SingletonAuthStream(RESTStream):
"""A stream with singleton authenticator."""
@property
def authenticator(self) -> SingletonAuthenticator:
"""Stream authenticator."""
return SingletonAuthenticator(stream=self)
Make a stream reuse the same authenticator instance for all requests¶
from functools import cached_property
from singer_sdk.authenticators import APIAuthenticatorBase
from singer_sdk.streams import RESTStream
class CachedAuthStream(RESTStream):
"""A stream with singleton authenticator."""
@cached_property
def authenticator(self) -> APIAuthenticatorBase:
"""Stream authenticator."""
return APIAuthenticatorBase(stream=self)
Use one of requests
’s built-in authenticators¶
from requests.auth import HTTPDigestAuth
from singer_sdk.streams import RESTStream
class DigestAuthStream(RESTStream):
"""A stream with digest authentication."""
@property
def authenticator(self) -> HTTPDigestAuth:
"""Stream authenticator."""
return HTTPDigestAuth(
username=self.config["username"],
password=self.config["password"],
)
HTTPBasicAuth
and
HTTPProxyAuth
are also available in requests.auth
. In addition to requests.auth
classes, the community
has published a few packages with custom authenticator classes, which are compatible with the SDK.
For example:
requests-aws4auth
: AWS v4 authenticationrequests_auth
: A collection of authenticators for various services and protocols including Azure, Okta and NTLM.
Custom response validation¶
Some APIs deviate from HTTP status codes to report failures. For those cases,
you can override RESTStream.validate_response()
and raise FatalAPIError
if an unrecoverable error is detected. If the API also has transient errors, either client-side
like rate limits, or server-side like temporary 5xx, you can raise
RetriableAPIError
and the request will be retried with back-off:
from enum import Enum
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.streams.rest import RESTStream
class CustomResponseValidationStream(RESTStream):
"""Stream with non-conventional error response."""
url_base = "https://badapi.test"
name = "non_conventional"
schema = {"type": "object", "properties": {}}
path = "/dummy
class StatusMessage(str, Enum):
"""Possible status messages."""
OK = "OK"
ERROR = "ERROR"
UNAVAILABLE = "UNAVAILABLE"
def validate_response(self, response):
# Still catch error status codes
super().validate_response(response)
data = response.json()
if data["status"] == self.StatusMessage.ERROR:
raise FatalAPIError("Error message found :(")
if data["status"] == self.StatusMessage.UNAVAILABLE:
raise RetriableAPIError("API is unavailable")
Custom Backoff¶
Custom backoff and retry behaviour can be added by overriding the methods:
For example, to use a constant retry:
def backoff_wait_generator() -> Callable[..., Generator[int, Any, None]]:
return backoff.constant(interval=10)
To utilise a response header as a wait value you can use backoff_runtime
, and pass a method that returns a wait value:
Note: By default jitter makes this function wait a bit longer than the value provided.
To disable jitter override backoff_jitter
.
In sdk versions <=0.21.0 the default jitter function will make the function below not work as you would expect without disabling jitter,
(here for more information) to disable jitter override the request_decorator
and pass jitter=None
to the backoff.on_exception
function.
def backoff_wait_generator() -> Callable[..., Generator[int, Any, None]]:
def _backoff_from_headers(retriable_api_error):
response_headers = retriable_api_error.response.headers
return int(response_headers.get("Retry-After", 0))
return self.backoff_runtime(value=_backoff_from_headers)
Additional Resources¶
More links, resources, and example solutions are available from community
members in the #singer-tap-development
and #singer-target-development
channels on Meltano Slack.