diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 0ce5c5c1e2cd0..fcd79be087f8d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -776,6 +776,13 @@ icon: retently.svg sourceType: api releaseStage: alpha +- name: RKI Covid + sourceDefinitionId: d78e5de0-aa44-4744-aa4f-74c818ccfe19 + dockerRepository: airbyte/source-rki-covid + dockerImageTag: 0.1.1 + documentationUrl: https://docs.airbyte.io/integrations/sources/rki-covid + sourceType: api + releaseStage: alpha - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 22bbe8ce76bb2..389c0929d5049 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7148,6 +7148,26 @@ path_in_connector_config: - "credentials" - "client_secret" +- dockerImage: "airbyte/source-rki-covid:0.1.1" + spec: + documentationUrl: "https://docs.airbyte.com/integrations/sources/rki-covid" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "RKI Covid Spec" + type: "object" + required: + - "start_date" + additionalProperties: false + properties: + start_date: + type: "string" + title: "Start Date" + description: "UTC date in the format 2017-01-25. Any data before this date\ + \ will not be replicated." + order: 1 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-s3:0.1.14" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/s3" diff --git a/airbyte-integrations/connectors/source-rki-covid/.dockerignore b/airbyte-integrations/connectors/source-rki-covid/.dockerignore new file mode 100644 index 0000000000000..0e23ce64d7c7a --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_rki_covid +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-rki-covid/Dockerfile b/airbyte-integrations/connectors/source-rki-covid/Dockerfile new file mode 100644 index 0000000000000..f226dc7cd918f --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/Dockerfile @@ -0,0 +1,34 @@ +FROM python:3.9.11-alpine3.15 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# copy payload code only +COPY main.py ./ +COPY source_rki_covid ./source_rki_covid + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.name=airbyte/source-rki-covid diff --git a/airbyte-integrations/connectors/source-rki-covid/README.md b/airbyte-integrations/connectors/source-rki-covid/README.md new file mode 100644 index 0000000000000..833f6bc05c896 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/README.md @@ -0,0 +1,144 @@ +# RKI Covid Source + +This is the repository for the RkI (Robert Koch-Institut - von Marlon Lückert) Covid-19 source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/rki-covid). + +## Local development +### Developed Streams (Endpoints) +``` +Germany: + 1. /germany + 2. /germany/age-groups + 3. /germany/history/cases/:days + 4. /germany/history/incidence/:days + 5. /germany/history/deaths/:days + 6. /germany/history/recovered/:days + 7. /germany/history/frozen-incidence/:days + 8. /germany/history/hospitalization/:days +``` + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-rki-covid:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/rki-covid) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_rki_covid/spec.json` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source rki-covid test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-rki-covid:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-rki-covid:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-rki-covid:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-rki-covid:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-rki-covid:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-rki-covid:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-rki-covid:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-rki-covid:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-rki-covid/acceptance-test-config.yml b/airbyte-integrations/connectors/source-rki-covid/acceptance-test-config.yml new file mode 100644 index 0000000000000..ad3b3adbb56de --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/acceptance-test-config.yml @@ -0,0 +1,28 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-rki-covid:dev +tests: + spec: + - spec_path: "source_rki_covid/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [ "germany", "germany_age_groups", "german_history_frozen_incidence"] +# TODO uncomment this block to specify that the tests should assert the connector outputs the records provided in the input file a file +# expect_records: +# path: "integration_tests/expected_records.txt" +# extra_fields: no +# exact_order: no +# extra_records: yes + incremental: # TODO + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-rki-covid/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-rki-covid/acceptance-test-docker.sh new file mode 100755 index 0000000000000..c51577d10690c --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-rki-covid/bootstrap.md b/airbyte-integrations/connectors/source-rki-covid/bootstrap.md new file mode 100644 index 0000000000000..5061ac55ce925 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/bootstrap.md @@ -0,0 +1,21 @@ +The (Robert Koch-Institut - von Marlon Lückert) Covid-19 is [a REST based API](https://api.corona-zahlen.org/). +Connector is implemented with [Airbyte CDK](https://docs.airbyte.io/connector-development/cdk-python). + +## Cases In Germany Covid api stream +The basic entry stream is 'germany'. All other streams are extended version of base stream and passing parameters also result in sliced data. +For production, every developer application can view multiple streams. + +## Endpoints +* [Provides covid cases and other information in Germany.](https://api.corona-zahlen.org/germany) \(Non-Incremental\ Entry-Stream) +* [Provides covid cases and other information in Germany, group by age.](https://api.corona-zahlen.org/germany/age-groups) \(Non-Incremental\) +* [Provides cases in Germany based on days.](https://api.corona-zahlen.org/germany/germany/history/cases/:days) \(Incremental\) +* [Provides incidence rate of covid in Germany based on days.](https://api.corona-zahlen.org/germany/germany/history/incidence/:days) \(Incremental\) +* [Provides death rate in Germany over days](https://api.corona-zahlen.org/germany/germany/history/deaths/:days) \(Incremental\) +* [Provides recovery rate in Germany over days.](https://api.corona-zahlen.org/germany/germany/history/recovered/:days) \(Incremental\) +* [Provides frozen incidence in Germany over days.](https://api.corona-zahlen.org/germany/germany/history/frozen-incidence/:days) \(Incremental\) +* [Provides hospitalization rate in Germany over days.](https://api.corona-zahlen.org/germany/germany/history/hospitalization/:days) \(Incremental\) + + + +Incremental streams have required parameter start-date. Without passing start-date as parameter full-refresh occurs. +As cursor field this connector uses "date". \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-rki-covid/build.gradle b/airbyte-integrations/connectors/source-rki-covid/build.gradle new file mode 100644 index 0000000000000..eb34765a96db5 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_rki_covid' +} diff --git a/airbyte-integrations/connectors/source-rki-covid/integration_tests/__init__.py b/airbyte-integrations/connectors/source-rki-covid/integration_tests/__init__.py new file mode 100644 index 0000000000000..46b7376756ec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-rki-covid/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-rki-covid/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..f78d22b27ce40 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/integration_tests/abnormal_state.json @@ -0,0 +1,20 @@ +{ + "germany_history_cases": { + "date": "2024-04-06T00:00:00.000Z" + }, + "german_history_incidence": { + "date": "2024-04-06T00:00:00.000Z" + }, + "german_history_deaths": { + "date": "2024-04-06T00:00:00.000Z" + }, + "german_history_recovered": { + "date": "2024-04-06T00:00:00.000Z" + }, + "german_history_hospitalization": { + "date": "2024-04-07T00:00:00.000Z" + }, + "german_history_frozen_incidence": { + "date": "2024-01-01T00:00:00.000Z" + } +} diff --git a/airbyte-integrations/connectors/source-rki-covid/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-rki-covid/integration_tests/acceptance.py new file mode 100644 index 0000000000000..950b53b59d416 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/integration_tests/acceptance.py @@ -0,0 +1,14 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-rki-covid/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-rki-covid/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..de447b2a146bf --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/integration_tests/configured_catalog.json @@ -0,0 +1,81 @@ +{ + "streams": [ + { + "stream": { + "name": "germany_age_groups", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "germany_history_cases", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "cursor_field": ["date"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "german_history_incidence", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "german_history_deaths", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "cursor_field": ["date"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "german_history_recovered", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "german_history_frozen_incidence", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "german_history_hospitalization", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["date"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-rki-covid/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..badf3b47a14e5 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/integration_tests/invalid_config.json @@ -0,0 +1,3 @@ +{ + "start_date": "20220426" +} diff --git a/airbyte-integrations/connectors/source-rki-covid/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-rki-covid/integration_tests/sample_config.json new file mode 100644 index 0000000000000..f82b6cc40f313 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/integration_tests/sample_config.json @@ -0,0 +1,3 @@ +{ + "start_date": "2022-01-01" +} diff --git a/airbyte-integrations/connectors/source-rki-covid/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-rki-covid/integration_tests/sample_state.json new file mode 100644 index 0000000000000..937df032816cb --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/integration_tests/sample_state.json @@ -0,0 +1,18 @@ +{ + "germany_history_cases": { + "date": "2024-01-01T00:00:00.000Z" + }, + "german_history_incidence": { + "date": "2024-01-01T00:00:00.000Z" + }, + "german_history_deaths": { + "date": "2024-01-01T00:00:00.000Z" + }, + "german_history_recovered": { + "date": "2024-01-01T00:00:00.000Z" + }, + "german_history_hospitalization": { + "date": "2024-01-01T00:00:00.000Z" + }, + "german_history_frozen_incidence": {} +} diff --git a/airbyte-integrations/connectors/source-rki-covid/main.py b/airbyte-integrations/connectors/source-rki-covid/main.py new file mode 100644 index 0000000000000..ef743f3120e2c --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_rki_covid import SourceRkiCovid + +if __name__ == "__main__": + source = SourceRkiCovid() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-rki-covid/requirements.txt b/airbyte-integrations/connectors/source-rki-covid/requirements.txt new file mode 100644 index 0000000000000..0411042aa0911 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-rki-covid/setup.py b/airbyte-integrations/connectors/source-rki-covid/setup.py new file mode 100644 index 0000000000000..cb44a13582883 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/setup.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk", +] + +TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1", "source-acceptance-test", "airbyte-cdk"] + +setup( + name="source_rki_covid", + description="Source implementation for Rki Covid.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/__init__.py b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/__init__.py new file mode 100644 index 0000000000000..13bc66fa24171 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceRkiCovid + +__all__ = ["SourceRkiCovid"] diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/TODO.md b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/TODO.md new file mode 100644 index 0000000000000..cf1efadb3c9c9 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/TODO.md @@ -0,0 +1,25 @@ +# TODO: Define your stream schemas +Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org). + +The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it. + +The schema of a stream is the return value of `Stream.get_json_schema`. + +## Static schemas +By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need. + +Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files. + +## Dynamic schemas +If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org). + +## Dynamically modifying static schemas +Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value: +``` +def get_json_schema(self): + schema = super().get_json_schema() + schema['dynamically_determined_property'] = "property" + return schema +``` + +Delete this file once you're done. Or don't. Up to you :) diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_deaths.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_deaths.json new file mode 100644 index 0000000000000..a5f0eb9fd58d0 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_deaths.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "deaths": { + "type": "integer" + }, + "date": { + "type": "string" + } + }, + "required": ["date", "deaths"] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_frozen_incidence.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_frozen_incidence.json new file mode 100644 index 0000000000000..092c761412741 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_frozen_incidence.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "weekIncidence": { + "type": "number" + }, + "date": { + "type": "string" + } + }, + "required": ["date", "weekIncidence"] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_hospitalization.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_hospitalization.json new file mode 100644 index 0000000000000..2af5e74b4ec0f --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_hospitalization.json @@ -0,0 +1,60 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "cases7Days": { + "type": "integer" + }, + "incidence7Days": { + "type": ["number", "null"] + }, + "date": { + "type": ["string", "null"] + }, + "fixedCases7Days": { + "type": ["integer", "null"] + }, + "updatedCases7Days": { + "type": ["integer", "null"] + }, + "adjustedLowerCases7Days": { + "type": ["integer", "null"] + }, + "adjustedCases7Days": { + "type": ["integer", "null"] + }, + "adjustedUpperCases7Days": { + "type": ["integer", "null"] + }, + "fixedIncidence7Days": { + "type": ["number", "null"] + }, + "updatedIncidence7Days": { + "type": ["number", "null"] + }, + "adjustedLowerIncidence7Days": { + "type": ["number", "null"] + }, + "adjustedIncidence7Days": { + "type": ["number", "null"] + }, + "adjustedUpperIncidence7Days": { + "type": ["number", "null"] + } + }, + "required": [ + "adjustedCases7Days", + "adjustedIncidence7Days", + "adjustedLowerCases7Days", + "adjustedLowerIncidence7Days", + "adjustedUpperCases7Days", + "adjustedUpperIncidence7Days", + "cases7Days", + "date", + "fixedCases7Days", + "fixedIncidence7Days", + "incidence7Days", + "updatedCases7Days", + "updatedIncidence7Days" + ] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_incidence.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_incidence.json new file mode 100644 index 0000000000000..092c761412741 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_incidence.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "weekIncidence": { + "type": "number" + }, + "date": { + "type": "string" + } + }, + "required": ["date", "weekIncidence"] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_recovered.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_recovered.json new file mode 100644 index 0000000000000..2e3015740a70c --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/german_history_recovered.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "recovered": { + "type": "integer" + }, + "date": { + "type": "string" + } + }, + "required": ["date", "recovered"] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany.json new file mode 100644 index 0000000000000..3cd32ad514842 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany.json @@ -0,0 +1,132 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "cases": { + "type": "integer" + }, + "deaths": { + "type": "integer" + }, + "recovered": { + "type": "integer" + }, + "weekIncidence": { + "type": "number" + }, + "casesPer100k": { + "type": "number" + }, + "casesPerWeek": { + "type": "integer" + }, + "delta": { + "type": "object", + "properties": { + "cases": { + "type": "integer" + }, + "deaths": { + "type": "integer" + }, + "recovered": { + "type": "integer" + } + }, + "required": ["cases", "deaths", "recovered"] + }, + "r": { + "type": "object", + "properties": { + "value": { + "type": "number" + }, + "rValue4Days": { + "type": "object", + "properties": { + "value": { + "type": "number" + }, + "date": { + "type": "string" + } + }, + "required": ["date", "value"] + }, + "rValue7Days": { + "type": "object", + "properties": { + "value": { + "type": "number" + }, + "date": { + "type": "string" + } + }, + "required": ["date", "value"] + }, + "lastUpdate": { + "type": "string" + } + }, + "required": ["lastUpdate", "rValue4Days", "rValue7Days", "value"] + }, + "hospitalization": { + "type": "object", + "properties": { + "cases7Days": { + "type": "integer" + }, + "incidence7Days": { + "type": "number" + }, + "date": { + "type": "string" + }, + "lastUpdate": { + "type": "string" + } + }, + "required": ["cases7Days", "date", "incidence7Days", "lastUpdate"] + }, + "meta": { + "type": "object", + "properties": { + "source": { + "type": "string" + }, + "contact": { + "type": "string" + }, + "info": { + "type": "string" + }, + "lastUpdate": { + "type": "string" + }, + "lastCheckedForUpdate": { + "type": "string" + } + }, + "required": [ + "contact", + "info", + "lastCheckedForUpdate", + "lastUpdate", + "source" + ] + } + }, + "required": [ + "cases", + "casesPer100k", + "casesPerWeek", + "deaths", + "delta", + "hospitalization", + "meta", + "r", + "recovered", + "weekIncidence" + ] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany_age_groups.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany_age_groups.json new file mode 100644 index 0000000000000..4b61518a0a673 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany_age_groups.json @@ -0,0 +1,337 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "A00-A04": { + "type": "object", + "properties": { + "casesMale": { + "type": ["number", "integer"] + }, + "casesFemale": { + "type": ["number", "integer"] + }, + "deathsMale": { + "type": ["number", "integer"] + }, + "deathsFemale": { + "type": ["number", "integer"] + }, + "casesMalePer100k": { + "type": ["number", "integer"] + }, + "casesFemalePer100k": { + "type": ["number", "integer"] + }, + "deathsMalePer100k": { + "type": ["number", "integer"] + }, + "deathsFemalePer100k": { + "type": ["number", "integer"] + }, + "hospitalization": { + "type": "object", + "properties": { + "cases7Days": { + "type": ["number", "integer"] + }, + "incidence7Days": { + "type": ["number", "integer"] + }, + "date": { + "type": "string" + } + }, + "required": ["cases7Days", "date", "incidence7Days"] + } + }, + "required": [ + "casesFemale", + "casesFemalePer100k", + "casesMale", + "casesMalePer100k", + "deathsFemale", + "deathsFemalePer100k", + "deathsMale", + "deathsMalePer100k", + "hospitalization" + ] + }, + "A05-A14": { + "type": "object", + "properties": { + "casesMale": { + "type": ["number", "integer"] + }, + "casesFemale": { + "type": ["number", "integer"] + }, + "deathsMale": { + "type": ["number", "integer"] + }, + "deathsFemale": { + "type": ["number", "integer"] + }, + "casesMalePer100k": { + "type": ["number", "integer"] + }, + "casesFemalePer100k": { + "type": ["number", "integer"] + }, + "deathsMalePer100k": { + "type": ["number", "integer"] + }, + "deathsFemalePer100k": { + "type": ["number", "integer"] + }, + "hospitalization": { + "type": "object", + "properties": { + "cases7Days": { + "type": ["number", "integer"] + }, + "incidence7Days": { + "type": ["number", "integer"] + }, + "date": { + "type": "string" + } + }, + "required": ["cases7Days", "date", "incidence7Days"] + } + }, + "required": [ + "casesFemale", + "casesFemalePer100k", + "casesMale", + "casesMalePer100k", + "deathsFemale", + "deathsFemalePer100k", + "deathsMale", + "deathsMalePer100k", + "hospitalization" + ] + }, + "A15-A34": { + "type": "object", + "properties": { + "casesMale": { + "type": ["number", "integer"] + }, + "casesFemale": { + "type": ["number", "integer"] + }, + "deathsMale": { + "type": ["number", "integer"] + }, + "deathsFemale": { + "type": ["number", "integer"] + }, + "casesMalePer100k": { + "type": ["number", "integer"] + }, + "casesFemalePer100k": { + "type": ["number", "integer"] + }, + "deathsMalePer100k": { + "type": ["number", "integer"] + }, + "deathsFemalePer100k": { + "type": ["number", "integer"] + }, + "hospitalization": { + "type": "object", + "properties": { + "cases7Days": { + "type": ["number", "integer"] + }, + "incidence7Days": { + "type": ["number", "integer"] + }, + "date": { + "type": "string" + } + }, + "required": ["cases7Days", "date", "incidence7Days"] + } + }, + "required": [ + "casesFemale", + "casesFemalePer100k", + "casesMale", + "casesMalePer100k", + "deathsFemale", + "deathsFemalePer100k", + "deathsMale", + "deathsMalePer100k", + "hospitalization" + ] + }, + "A35-A59": { + "type": "object", + "properties": { + "casesMale": { + "type": ["number", "integer"] + }, + "casesFemale": { + "type": ["number", "integer"] + }, + "deathsMale": { + "type": ["number", "integer"] + }, + "deathsFemale": { + "type": ["number", "integer"] + }, + "casesMalePer100k": { + "type": ["number", "integer"] + }, + "casesFemalePer100k": { + "type": ["number", "integer"] + }, + "deathsMalePer100k": { + "type": ["number", "integer"] + }, + "deathsFemalePer100k": { + "type": ["number", "integer"] + }, + "hospitalization": { + "type": "object", + "properties": { + "cases7Days": { + "type": ["number", "integer"] + }, + "incidence7Days": { + "type": ["number", "integer"] + }, + "date": { + "type": "string" + } + }, + "required": ["cases7Days", "date", "incidence7Days"] + } + }, + "required": [ + "casesFemale", + "casesFemalePer100k", + "casesMale", + "casesMalePer100k", + "deathsFemale", + "deathsFemalePer100k", + "deathsMale", + "deathsMalePer100k", + "hospitalization" + ] + }, + "A60-A79": { + "type": "object", + "properties": { + "casesMale": { + "type": ["number", "integer"] + }, + "casesFemale": { + "type": ["number", "integer"] + }, + "deathsMale": { + "type": ["number", "integer"] + }, + "deathsFemale": { + "type": ["number", "integer"] + }, + "casesMalePer100k": { + "type": ["number", "integer"] + }, + "casesFemalePer100k": { + "type": ["number", "integer"] + }, + "deathsMalePer100k": { + "type": ["number", "integer"] + }, + "deathsFemalePer100k": { + "type": ["number", "integer"] + }, + "hospitalization": { + "type": "object", + "properties": { + "cases7Days": { + "type": ["number", "integer"] + }, + "incidence7Days": { + "type": ["number", "integer"] + }, + "date": { + "type": "string" + } + }, + "required": ["cases7Days", "date", "incidence7Days"] + } + }, + "required": [ + "casesFemale", + "casesFemalePer100k", + "casesMale", + "casesMalePer100k", + "deathsFemale", + "deathsFemalePer100k", + "deathsMale", + "deathsMalePer100k", + "hospitalization" + ] + }, + "A80+": { + "type": "object", + "properties": { + "casesMale": { + "type": ["number", "integer"] + }, + "casesFemale": { + "type": ["number", "integer"] + }, + "deathsMale": { + "type": ["number", "integer"] + }, + "deathsFemale": { + "type": ["number", "integer"] + }, + "casesMalePer100k": { + "type": ["number", "integer"] + }, + "casesFemalePer100k": { + "type": ["number", "integer"] + }, + "deathsMalePer100k": { + "type": ["number", "integer"] + }, + "deathsFemalePer100k": { + "type": ["number", "integer"] + }, + "hospitalization": { + "type": "object", + "properties": { + "cases7Days": { + "type": ["number", "integer"] + }, + "incidence7Days": { + "type": ["number", "integer"] + }, + "date": { + "type": "string" + } + }, + "required": ["cases7Days", "date", "incidence7Days"] + } + }, + "required": [ + "casesFemale", + "casesFemalePer100k", + "casesMale", + "casesMalePer100k", + "deathsFemale", + "deathsFemalePer100k", + "deathsMale", + "deathsMalePer100k", + "hospitalization" + ] + } + }, + "required": ["A00-A04", "A05-A14", "A15-A34", "A35-A59", "A60-A79", "A80+"] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany_history_cases.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany_history_cases.json new file mode 100644 index 0000000000000..a13bb77774de2 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/schemas/germany_history_cases.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/schema#", + "type": "object", + "properties": { + "cases": { + "type": "integer" + }, + "date": { + "type": "string" + } + }, + "required": ["cases", "date"] +} diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/source.py b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/source.py new file mode 100644 index 0000000000000..88b07ae5b3d1b --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/source.py @@ -0,0 +1,420 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC +from datetime import datetime +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple + +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream + + +# Basic full refresh stream +class RkiCovidStream(HttpStream, ABC): + + url_base = "https://api.corona-zahlen.org/" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + """ + TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params. + Usually contains common params e.g. pagination size etc. + """ + return {} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + TODO: Override this method to define how a response is parsed. + :return an iterable containing each record in the response + """ + yield response.json() + + +# class that contains main source germany | full-refresh +class Germany(RkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany""" + + primary_key = None + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "germany/" + + +# class that contains source age-groups in germany. | full-refresh +class GermanyAgeGroups(RkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany/age-groups""" + + primary_key = None + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield response.json().get("data") + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "germany/age-groups" + + +# Basic incremental stream +class IncrementalRkiCovidStream(RkiCovidStream, ABC): + + state_checkpoint_interval = None + + @property + def cursor_field(self) -> str: + """ + TODO + Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is + usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental. + + :return str: The name of the cursor field. + """ + return [] + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and + the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. + """ + return {} + + +# source: germany/history/cases/:days | Incremental +class GermanyHistoryCases(IncrementalRkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany/germany/history/cases/:days""" + + primary_key = None + + def __init__(self, config, **kwargs): + super().__init__(**kwargs) + self.start_date = config.get("start_date") + + @property + def source_defined_cursor(self) -> bool: + return False + + @property + def cursor_field(self) -> str: + return "date" + + def date_to_int(self, start_date) -> int: + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return 1 + return diff.days + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + if not current_stream_state: + current_stream_state = {self.cursor_field: self.start_date} + return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + records = super().read_records(stream_state=stream_state, **kwargs) + if stream_state: + for record in records: + if record[self.cursor_field] > stream_state.get(self.cursor_field): + yield record + else: + yield from records + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + return response.json().get("data") + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + if self.start_date: + return "germany/history/cases/" + str(self.date_to_int(self.start_date)) + return "germany/history/cases/" + + +# source: germany/history/incidence/:days | Incremental +class GermanHistoryIncidence(IncrementalRkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany/germany/history/incidence/:days""" + + primary_key = None + + def __init__(self, config, **kwargs): + super().__init__(**kwargs) + self.start_date = config.get("start_date") + + @property + def source_defined_cursor(self) -> bool: + return False + + @property + def cursor_field(self) -> str: + return "date" + + def date_to_int(self, start_date) -> int: + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return 1 + return diff.days + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + if not current_stream_state: + current_stream_state = {self.cursor_field: self.start_date} + return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + records = super().read_records(stream_state=stream_state, **kwargs) + if stream_state: + for record in records: + if record[self.cursor_field] > stream_state.get(self.cursor_field): + yield record + else: + yield from records + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if response.json().get("data"): + return response.json().get("data") + pass + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + if self.start_date: + return "germany/history/incidence/" + str(self.date_to_int(self.start_date)) + return "germany/history/incidence/" + + +# source: germany/history/deaths/:days | Incremental +class GermanHistoryDeaths(IncrementalRkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany/germany/history/deaths/:days""" + + primary_key = None + + def __init__(self, config, **kwargs): + super().__init__(**kwargs) + self.start_date = config.get("start_date") + + @property + def source_defined_cursor(self) -> bool: + return False + + @property + def cursor_field(self) -> str: + return "date" + + def date_to_int(self, start_date) -> int: + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return 1 + return diff.days + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + if not current_stream_state: + current_stream_state = {self.cursor_field: self.start_date} + return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + records = super().read_records(stream_state=stream_state, **kwargs) + if stream_state: + for record in records: + if record[self.cursor_field] > stream_state.get(self.cursor_field): + yield record + else: + yield from records + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + return response.json().get("data") + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + if self.start_date: + return "germany/history/deaths/" + str(self.date_to_int(self.start_date)) + return "germany/history/deaths/" + + +# source: germany/history/recovered/:days | Incremental +class GermanHistoryRecovered(IncrementalRkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany/germany/history/recovered/:days""" + + primary_key = None + + def __init__(self, config, **kwargs): + super().__init__(**kwargs) + self.start_date = config.get("start_date") + + @property + def source_defined_cursor(self) -> bool: + return False + + @property + def cursor_field(self) -> str: + return "date" + + def date_to_int(self, start_date) -> int: + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return 1 + return diff.days + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + if not current_stream_state: + current_stream_state = {self.cursor_field: self.start_date} + return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + records = super().read_records(stream_state=stream_state, **kwargs) + if stream_state: + for record in records: + if record[self.cursor_field] > stream_state.get(self.cursor_field): + yield record + else: + yield from records + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + return response.json().get("data") + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + if self.start_date: + return "germany/history/recovered/" + str(self.date_to_int(self.start_date)) + return "germany/history/recovered/" + + +# source: germany/history/frozen-incidence/:days | Incremental +class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany/germany/history/frozen-incidence/:days""" + + primary_key = None + + def __init__(self, config, **kwargs): + super().__init__(**kwargs) + self.start_date = config.get("start_date") + + @property + def source_defined_cursor(self) -> bool: + return False + + @property + def cursor_field(self) -> str: + return "date" + + def date_to_int(self, start_date) -> int: + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return 1 + return diff.days + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + if not current_stream_state: + current_stream_state = {self.cursor_field: self.start_date} + return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + records = super().read_records(stream_state=stream_state, **kwargs) + if stream_state: + for record in records: + if record[self.cursor_field] > stream_state.get(self.cursor_field): + yield record + else: + yield from records + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + return response.json().get("data").get("history") + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + if self.start_date: + return "germany/history/frozen-incidence/" + str(self.date_to_int(self.start_date)) + return "germany/history/frozen-incidence/" + + +# source: germany/history/hospitalization/:days | Incremental +class GermanHistoryHospitalization(IncrementalRkiCovidStream): + """Docs: https://api.corona-zahlen.org/germany/germany/history/hospitalization/:days""" + + primary_key = None + + def __init__(self, config, **kwargs): + super().__init__(**kwargs) + self.start_date = config.get("start_date") + + @property + def source_defined_cursor(self) -> bool: + return False + + @property + def cursor_field(self) -> str: + return "date" + + def date_to_int(self, start_date) -> int: + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return 1 + return diff.days + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + if not current_stream_state: + current_stream_state = {self.cursor_field: self.start_date} + return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + records = super().read_records(stream_state=stream_state, **kwargs) + if stream_state: + for record in records: + if record[self.cursor_field] > stream_state.get(self.cursor_field): + yield record + else: + yield from records + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + return response.json().get("data") + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + if self.start_date: + return "germany/history/hospitalization/" + str(self.date_to_int(self.start_date)) + return "germany/history/hospitalization/" + + +# Source +class SourceRkiCovid(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + """ + Testing connection availability for the connector. + + :param config: the user-input config object conforming to the connector's spec.json + :param logger: logger object + :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. + """ + try: + req = requests.get(RkiCovidStream.url_base + "germany") + if req.status_code == 200: + return True, None + return False, req.reason + except Exception: + return False, "There is a problem in source check connection." + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + """ + :param config: A Mapping of the user input configuration as defined in the connector spec. + """ + + return [ + Germany(), + GermanyAgeGroups(), + GermanyHistoryCases(config=config), + GermanHistoryIncidence(config=config), + GermanHistoryDeaths(config=config), + GermanHistoryRecovered(config=config), + GermanHistoryFrozenIncidence(config=config), + GermanHistoryHospitalization(config=config), + ] diff --git a/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/spec.json b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/spec.json new file mode 100644 index 0000000000000..eac69f9467bd2 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/source_rki_covid/spec.json @@ -0,0 +1,18 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/sources/rki-covid", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "RKI Covid Spec", + "type": "object", + "required": ["start_date"], + "additionalProperties": false, + "properties": { + "start_date": { + "type": "string", + "title": "Start Date", + "description": "UTC date in the format 2017-01-25. Any data before this date will not be replicated.", + "order": 1 + } + } + } +} diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/__init__.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/__init__.py new file mode 100644 index 0000000000000..46b7376756ec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_cached_stream_state.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_cached_stream_state.py new file mode 100644 index 0000000000000..afff7653e69c5 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_cached_stream_state.py @@ -0,0 +1,5 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +# diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistorycases.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistorycases.py new file mode 100644 index 0000000000000..24cb6450bbd50 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistorycases.py @@ -0,0 +1,52 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from datetime import datetime + +import requests +from pytest import fixture +from source_rki_covid.source import GermanyHistoryCases + + +@fixture +def patch_incremental_german_history_cases(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GermanyHistoryCases, "primary_key", None) + + +def test_cursor_field(patch_incremental_german_history_cases): + config = {"start_date": "2022-04-27"} + stream = GermanyHistoryCases(config) + expected_cursor_field = "date" + assert stream.cursor_field == expected_cursor_field + + +def test_parse_response(patch_incremental_german_history_cases): + config = {"start_date": "2022-04-27"} + stream = GermanyHistoryCases(config) + response = requests.get("https://api.corona-zahlen.org/germany/history/cases/1") + expected_response = response.json().get("data") + assert stream.parse_response(response) == expected_response + + +def check_diff(start_date): + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return str(1) + return str(diff.days) + + +def test_parse_with_cases(patch_incremental_german_history_cases): + config = {"start_date": "2022-04-27"} + stream = GermanyHistoryCases(config) + expected_stream_path = "germany/history/cases/" + check_diff(config.get("start_date")) + assert stream.path() == expected_stream_path + + +def test_parse_without_cases(patch_incremental_german_history_cases): + config = {} + stream = GermanyHistoryCases(config) + expected_stream_path = "germany/history/cases/" + assert stream.path() == expected_stream_path diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistorydeaths.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistorydeaths.py new file mode 100644 index 0000000000000..7c73f7d31d3f7 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistorydeaths.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from datetime import datetime, timedelta + +import requests +from pytest import fixture +from source_rki_covid.source import GermanHistoryDeaths + + +@fixture +def patch_incremental_german_history_deaths(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GermanHistoryDeaths, "primary_key", None) + + +def test_cursor_field(patch_incremental_german_history_deaths): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryDeaths(config) + expected_cursor_field = "date" + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_german_history_deaths): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryDeaths(config) + d = datetime.date(datetime.today()) - timedelta(days=1) + date = {stream.cursor_field: str(d)} + inputs = {"current_stream_state": date, "latest_record": date} + expected_state = {stream.cursor_field: str(d)} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_parse_response(patch_incremental_german_history_deaths): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryDeaths(config) + response = requests.get("https://api.corona-zahlen.org/germany/history/deaths/1") + expected_response = response.json().get("data") + assert stream.parse_response(response) == expected_response + + +def check_diff(start_date): + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return str(1) + return str(diff.days) + + +def test_parse_with_cases(patch_incremental_german_history_deaths): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryDeaths(config) + expected_stream_path = "germany/history/deaths/" + check_diff(config.get("start_date")) + assert stream.path() == expected_stream_path + + +def test_parse_without_cases(patch_incremental_german_history_deaths): + config = {} + stream = GermanHistoryDeaths(config) + expected_stream_path = "germany/history/deaths/" + assert stream.path() == expected_stream_path diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryfrozenIncidence.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryfrozenIncidence.py new file mode 100644 index 0000000000000..3d59f4617ba6d --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryfrozenIncidence.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from datetime import datetime, timedelta + +import requests +from pytest import fixture +from source_rki_covid.source import GermanHistoryFrozenIncidence + + +@fixture +def patch_incremental_german_history_frozenInc(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GermanHistoryFrozenIncidence, "primary_key", None) + + +def test_cursor_field(patch_incremental_german_history_frozenInc): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryFrozenIncidence(config) + expected_cursor_field = "date" + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_german_history_frozenInc): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryFrozenIncidence(config) + d = datetime.date(datetime.today()) - timedelta(days=1) + date = {stream.cursor_field: str(d)} + inputs = {"current_stream_state": date, "latest_record": date} + expected_state = {stream.cursor_field: str(d)} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_parse_response(patch_incremental_german_history_frozenInc): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryFrozenIncidence(config) + response = requests.get("https://api.corona-zahlen.org/germany/history/frozen-incidence/1") + expected_response = response.json().get("data").get("history") + assert stream.parse_response(response) == expected_response + + +def check_diff(start_date): + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return str(1) + return str(diff.days) + + +def test_parse_with_cases(patch_incremental_german_history_frozenInc): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryFrozenIncidence(config) + expected_stream_path = "germany/history/frozen-incidence/" + check_diff(config.get("start_date")) + assert stream.path() == expected_stream_path + + +def test_parse_without_cases(patch_incremental_german_history_frozenInc): + config = {} + stream = GermanHistoryFrozenIncidence(config) + expected_stream_path = "germany/history/frozen-incidence/" + assert stream.path() == expected_stream_path diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryhospitalization.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryhospitalization.py new file mode 100644 index 0000000000000..636c3cb6e8f92 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryhospitalization.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from datetime import datetime, timedelta + +import requests +from pytest import fixture +from source_rki_covid.source import GermanHistoryHospitalization + + +@fixture +def patch_incremental_german_history_hospitalization(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GermanHistoryHospitalization, "primary_key", None) + + +def test_cursor_field(patch_incremental_german_history_hospitalization): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryHospitalization(config) + expected_cursor_field = "date" + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_german_history_hospitalization): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryHospitalization(config) + d = datetime.date(datetime.today()) - timedelta(days=1) + date = {stream.cursor_field: str(d)} + inputs = {"current_stream_state": date, "latest_record": date} + expected_state = {stream.cursor_field: str(d)} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_parse_response(patch_incremental_german_history_hospitalization): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryHospitalization(config) + response = requests.get("https://api.corona-zahlen.org/germany/history/hospitalization/1") + expected_response = response.json().get("data") + assert stream.parse_response(response) == expected_response + + +def check_diff(start_date): + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return str(1) + return str(diff.days) + + +def test_parse_with_cases(patch_incremental_german_history_hospitalization): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryHospitalization(config) + expected_stream_path = "germany/history/hospitalization/" + check_diff(config.get("start_date")) + assert stream.path() == expected_stream_path + + +def test_parse_without_cases(patch_incremental_german_history_hospitalization): + config = {} + stream = GermanHistoryHospitalization(config) + expected_stream_path = "germany/history/hospitalization/" + assert stream.path() == expected_stream_path diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryincidence.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryincidence.py new file mode 100644 index 0000000000000..25bb4f88e2d58 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryincidence.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from datetime import datetime, timedelta + +import requests +from pytest import fixture +from source_rki_covid.source import GermanHistoryIncidence + + +@fixture +def patch_incremental_german_history_incidence(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GermanHistoryIncidence, "primary_key", None) + + +def test_cursor_field(patch_incremental_german_history_incidence): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryIncidence(config) + expected_cursor_field = "date" + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_german_history_incidence): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryIncidence(config) + d = datetime.date(datetime.today()) - timedelta(days=1) + date = {stream.cursor_field: str(d)} + inputs = {"current_stream_state": date, "latest_record": date} + expected_state = {stream.cursor_field: str(d)} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_parse_response(patch_incremental_german_history_incidence): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryIncidence(config) + response = requests.get("https://api.corona-zahlen.org/germany/history/incidence/1") + expected_response = response.json().get("data") + assert stream.parse_response(response) == expected_response + + +def check_diff(start_date): + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return str(1) + return str(diff.days) + + +def test_parse_with_cases(patch_incremental_german_history_incidence): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryIncidence(config) + expected_stream_path = "germany/history/incidence/" + check_diff(config.get("start_date")) + assert stream.path() == expected_stream_path + + +def test_parse_without_cases(patch_incremental_german_history_incidence): + config = {} + stream = GermanHistoryIncidence(config) + expected_stream_path = "germany/history/incidence/" + assert stream.path() == expected_stream_path diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryrecovered.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryrecovered.py new file mode 100644 index 0000000000000..20bd3aa3fbc6c --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_germanhistoryrecovered.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from datetime import datetime, timedelta + +import requests +from pytest import fixture +from source_rki_covid.source import GermanHistoryRecovered + + +@fixture +def patch_incremental_german_history_recovered(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GermanHistoryRecovered, "primary_key", None) + + +def test_cursor_field(patch_incremental_german_history_recovered): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryRecovered(config) + expected_cursor_field = "date" + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_german_history_recovered): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryRecovered(config) + d = datetime.date(datetime.today()) - timedelta(days=1) + date = {stream.cursor_field: str(d)} + inputs = {"current_stream_state": date, "latest_record": date} + expected_state = {stream.cursor_field: str(d)} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_parse_response(patch_incremental_german_history_recovered): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryRecovered(config) + response = requests.get("https://api.corona-zahlen.org/germany/history/recovered/1") + expected_response = response.json().get("data") + assert stream.parse_response(response) == expected_response + + +def check_diff(start_date): + diff = datetime.now() - datetime.strptime(start_date, "%Y-%m-%d") + if diff.days == 0: + return str(1) + return str(diff.days) + + +def test_parse_with_cases(patch_incremental_german_history_recovered): + config = {"start_date": "2022-04-27"} + stream = GermanHistoryRecovered(config) + expected_stream_path = "germany/history/recovered/" + check_diff(config.get("start_date")) + assert stream.path() == expected_stream_path + + +def test_parse_without_cases(patch_incremental_german_history_recovered): + config = {} + stream = GermanHistoryRecovered(config) + expected_stream_path = "germany/history/recovered/" + assert stream.path() == expected_stream_path diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_streams.py new file mode 100644 index 0000000000000..0484c3f204dcd --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_incremental_streams.py @@ -0,0 +1,52 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.models import SyncMode +from pytest import fixture +from source_rki_covid.source import IncrementalRkiCovidStream + + +@fixture +def patch_incremental_base_class(mocker): + mocker.patch.object(IncrementalRkiCovidStream, "path", "v0/example_endpoint") + mocker.patch.object(IncrementalRkiCovidStream, "primary_key", "test_primary_key") + mocker.patch.object(IncrementalRkiCovidStream, "__abstractmethods__", set()) + + +def test_cursor_field(patch_incremental_base_class): + stream = IncrementalRkiCovidStream() + expected_cursor_field = [] + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_base_class): + stream = IncrementalRkiCovidStream() + inputs = {"current_stream_state": None, "latest_record": None} + expected_state = {} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_stream_slices(patch_incremental_base_class): + stream = IncrementalRkiCovidStream() + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} + expected_stream_slice = [None] + assert stream.stream_slices(**inputs) == expected_stream_slice + + +def test_supports_incremental(patch_incremental_base_class, mocker): + mocker.patch.object(IncrementalRkiCovidStream, "cursor_field", "dummy_field") + stream = IncrementalRkiCovidStream() + assert stream.supports_incremental + + +def test_source_defined_cursor(patch_incremental_base_class): + stream = IncrementalRkiCovidStream() + assert stream.source_defined_cursor + + +def test_stream_checkpoint_interval(patch_incremental_base_class): + stream = IncrementalRkiCovidStream() + expected_checkpoint_interval = None + assert stream.state_checkpoint_interval == expected_checkpoint_interval diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_source.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_source.py new file mode 100644 index 0000000000000..94366f8c7416e --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_source.py @@ -0,0 +1,21 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from source_rki_covid.source import SourceRkiCovid + + +def test_check_connection(mocker): + source = SourceRkiCovid() + logger_mock, config_mock = MagicMock(), MagicMock() + assert source.check_connection(logger_mock, config_mock) == (True, None) + + +def test_streams(mocker): + source = SourceRkiCovid() + config_mock = MagicMock() + streams = source.streams(config_mock) + expected_streams_number = 8 + assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_stream_agegroup.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_stream_agegroup.py new file mode 100644 index 0000000000000..65b5958b61051 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_stream_agegroup.py @@ -0,0 +1,18 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from source_rki_covid.source import GermanyAgeGroups + + +@pytest.fixture +def patch_age_group(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(GermanyAgeGroups, "primary_key", None) + + +def test_path(patch_age_group): + stream = GermanyAgeGroups() + expected_params = {"path": "germany/age-groups"} + assert stream.path() == expected_params.get("path") diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_stream_germany.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_stream_germany.py new file mode 100644 index 0000000000000..89622d2c1e26f --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_stream_germany.py @@ -0,0 +1,18 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from source_rki_covid.source import Germany + + +@pytest.fixture +def patch_germany_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(Germany, "primary_key", None) + + +def test_path(patch_germany_class): + stream = Germany() + expected_params = {"path": "germany/"} + assert stream.path() == expected_params.get("path") diff --git a/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_streams.py new file mode 100644 index 0000000000000..d25f2936639f7 --- /dev/null +++ b/airbyte-integrations/connectors/source-rki-covid/unit_tests/test_streams.py @@ -0,0 +1,76 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +from source_rki_covid.source import RkiCovidStream + + +@pytest.fixture +def patch_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(RkiCovidStream, "path", "v0/example_endpoint") + mocker.patch.object(RkiCovidStream, "primary_key", "test_primary_key") + mocker.patch.object(RkiCovidStream, "__abstractmethods__", set()) + + +def test_request_params(patch_base_class): + stream = RkiCovidStream() + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_params = {} + assert stream.request_params(**inputs) == expected_params + + +def test_next_page_token(patch_base_class): + stream = RkiCovidStream() + inputs = {"response": MagicMock()} + expected_token = None + assert stream.next_page_token(**inputs) == expected_token + + +# def test_parse_response(patch_base_class): +# stream = RkiCovidStream() +# # TODO: replace this with your input parameters +# inputs = {"response": MagicMock()} +# # TODO: replace this with your expected parced object +# expected_parsed_object = {} +# assert next(stream.parse_response(**inputs)) == expected_parsed_object + + +def test_request_headers(patch_base_class): + stream = RkiCovidStream() + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_headers = {} + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(patch_base_class): + stream = RkiCovidStream() + expected_method = "GET" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(patch_base_class, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = RkiCovidStream() + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(patch_base_class): + response_mock = MagicMock() + stream = RkiCovidStream() + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time diff --git a/docs/integrations/sources/rki-covid.md b/docs/integrations/sources/rki-covid.md new file mode 100644 index 0000000000000..f887e22acde67 --- /dev/null +++ b/docs/integrations/sources/rki-covid.md @@ -0,0 +1,57 @@ +# Robert Koch-Institut Covid + +## Sync overview + +This source can sync data for the [Robert Koch-Institut Covid API](https://api.corona-zahlen.org/). It supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. + +### Output schema + +This Source is capable of syncing the following core Streams (only for Germany cases): + +* Germany +* Germany by age and groups +* Germany cases by days +* Germany incidences by days +* Germany deaths by days +* Germany recovered by days +* Germany frozen-incidence by days +* Germany hospitalization by days + +### Data type mapping + +| Integration Type | Airbyte Type | Notes | +| :--- | :--- | :--- | +| `string` | `string` | | +| `integer` | `integer` | | +| `number` | `number` | | +| `array` | `array` | | +| `object` | `object` | | + +### Features + +| Feature | Supported?\(Yes/No\) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental Sync | Yes | | +| Namespaces | No | | + +### Performance considerations + +The RKI Covid connector should not run into RKI Covid API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully. + +## Getting started + +### Requirements + +* Start Date + +### Setup guide + +Select start date + +## Changelog + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.1 | 2022-05-30 | [11732](https://github.com/airbytehq/airbyte/pull/11732) | Fix docs | +| 0.1.0 | 2022-05-30 | [11732](https://github.com/airbytehq/airbyte/pull/11732) | Initial Release |