diff --git a/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py index 056971f954502..e946df03c937c 100644 --- a/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py +++ b/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py @@ -3,14 +3,5 @@ # -import pytest - -pytest_plugins = ("source_acceptance_test.plugin",) - - -@pytest.fixture(scope="session", autouse=True) -def connector_setup(): - """This fixture is a placeholder for external resources that acceptance test might require.""" - # TODO: setup test dependencies if needed. otherwise remove the TODO comments - yield - # TODO: clean up test dependencies +def test_dummy_test(): + assert True diff --git a/airbyte-integrations/connectors/source-amplitude/setup.py b/airbyte-integrations/connectors/source-amplitude/setup.py index 45bcc35b471da..5a0c75710ea0a 100644 --- a/airbyte-integrations/connectors/source-amplitude/setup.py +++ b/airbyte-integrations/connectors/source-amplitude/setup.py @@ -9,7 +9,7 @@ "airbyte-cdk~=0.1", ] -TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1"] +TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1", "requests-mock"] setup( name="source_amplitude", diff --git a/airbyte-integrations/connectors/source-amplitude/unit_tests/api_data/zipped.json b/airbyte-integrations/connectors/source-amplitude/unit_tests/api_data/zipped.json new file mode 100644 index 0000000000000..b4028712857e1 Binary files /dev/null and b/airbyte-integrations/connectors/source-amplitude/unit_tests/api_data/zipped.json differ diff --git a/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py new file mode 100644 index 0000000000000..ef3bfd2844e43 --- /dev/null +++ b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py @@ -0,0 +1,185 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import pendulum +import pytest +import requests +from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events + + +class MockRequest: + def __init__(self, status_code): + self.status_code = status_code + + +class TestFullRefreshStreams: + @pytest.mark.parametrize( + "stream_cls, data, expected", + [ + (Cohorts, [{"key": "value"}], [{"key": "value"}]), + (Annotations, [{"key1": "value1"}], [{"key1": "value1"}]), + ], + ids=["Cohorts", "Annotations"], + ) + def test_parse_response(self, requests_mock, stream_cls, data, expected): + stream = stream_cls() + url = f"{stream.url_base}{stream.path()}" + data = {stream.data_field: data} + requests_mock.get(url, json=data) + response = requests.get(url) + assert list(stream.parse_response(response)) == expected + + @pytest.mark.parametrize( + "stream_cls, expected", + [ + (Cohorts, None), + (Annotations, None), + ], + ids=["Cohorts", "Annotations"], + ) + def test_next_page_token(self, requests_mock, stream_cls, expected): + stream = stream_cls() + url = f"{stream.url_base}{stream.path()}" + requests_mock.get(url, json={}) + response = requests.get(url) + assert stream.next_page_token(response) == expected + + @pytest.mark.parametrize( + "stream_cls, expected", + [ + (Cohorts, "3/cohorts"), + (Annotations, "2/annotations"), + ], + ids=["Cohorts", "Annotations"], + ) + def test_path(self, stream_cls, expected): + stream = stream_cls() + assert stream.path() == expected + + +class TestIncrementalStreams: + @pytest.mark.parametrize( + "stream_cls, data, expected", + [ + ( + ActiveUsers, + { + "xValues": ["2021-01-01", "2021-01-02"], + "series": [[1, 5]], + "seriesCollapsed": [[0]], + "seriesLabels": [0], + "seriesMeta": [{"segmentIndex": 0}], + }, + [{"date": "2021-01-01", "statistics": {0: 1}}, {"date": "2021-01-02", "statistics": {0: 5}}], + ), + ( + AverageSessionLength, + { + "xValues": ["2019-05-23", "2019-05-24"], + "series": [[2, 6]], + "seriesCollapsed": [[0]], + "seriesLabels": [0], + "seriesMeta": [{"segmentIndex": 0}], + }, + [{"date": "2019-05-23", "length": 2}, {"date": "2019-05-24", "length": 6}], + ), + ], + ids=["ActiveUsers", "AverageSessionLength"], + ) + def test_parse_response(self, requests_mock, stream_cls, data, expected): + stream = stream_cls("2021-01-01T00:00:00Z") + url = f"{stream.url_base}{stream.path()}" + data = {stream.data_field: data} + requests_mock.get(url, json=data) + response = requests.get(url) + result = list(stream.parse_response(response)) + assert result == expected + + @pytest.mark.parametrize( + "stream_cls, expected", + [ + (ActiveUsers, "2/users"), + (AverageSessionLength, "2/sessions/average"), + (Events, "2/export"), + ], + ids=["ActiveUsers", "AverageSessionLength", "Events"], + ) + def test_path(self, stream_cls, expected): + stream = stream_cls(pendulum.now().isoformat()) + assert stream.path() == expected + + @pytest.mark.parametrize( + "stream_cls, expected", + [ + (ActiveUsers, {"m": "active", "i": 1, "g": "country"}), + (AverageSessionLength, {}), + ], + ids=["ActiveUsers", "AverageSessionLength"], + ) + def test_request_params(self, stream_cls, expected): + now = pendulum.now() + stream = stream_cls(now.isoformat()) + # update expected with valid start,end dates + expected.update(**{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}) + assert stream.request_params({}) == expected + + @pytest.mark.parametrize( + "stream_cls, expected", + [ + (ActiveUsers, {}), + (AverageSessionLength, {}), + (Events, {}), + ], + ids=["ActiveUsers", "AverageSessionLength", "Events"], + ) + def test_next_page_token(self, requests_mock, stream_cls, expected): + days_ago = pendulum.now().subtract(days=2) + stream = stream_cls(days_ago.isoformat()) + start = days_ago.strftime(stream.date_template) + end = pendulum.yesterday().strftime(stream.date_template) + url = f"{stream.url_base}{stream.path()}?start={start}&end={end}" + # update expected with test values. + expected.update( + **{"start": pendulum.yesterday().strftime(stream.date_template), "end": pendulum.now().strftime(stream.date_template)} + ) + requests_mock.get(url) + response = requests.get(url) + assert stream.next_page_token(response) == expected + + @pytest.mark.parametrize( + "stream_cls, expected", + [ + (ActiveUsers, ""), + (AverageSessionLength, ""), + (Events, ""), + ], + ids=["ActiveUsers", "AverageSessionLength", "Events"], + ) + def test_get_end_date(self, stream_cls, expected): + now = pendulum.now() + yesterday = pendulum.yesterday() + stream = stream_cls(yesterday.isoformat()) + # update expected with test values. + expected = now.strftime(stream.date_template) + assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected + + class TestEventsStream: + def test_parse_zip(self): + stream = Events(pendulum.now().isoformat()) + expected = [{"id": 123}] + result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json")) + assert expected == result + + def test_stream_slices(self): + stream = Events(pendulum.now().isoformat()) + now = pendulum.now() + expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}] + assert expected == stream.stream_slices() + + def test_request_params(self): + stream = Events(pendulum.now().isoformat()) + now = pendulum.now().subtract(hours=6) + slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)} + assert slice == stream.request_params(slice) diff --git a/airbyte-integrations/connectors/source-amplitude/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_errors.py similarity index 100% rename from airbyte-integrations/connectors/source-amplitude/unit_tests/unit_test.py rename to airbyte-integrations/connectors/source-amplitude/unit_tests/test_errors.py diff --git a/airbyte-integrations/connectors/source-amplitude/unit_tests/test_source.py b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_source.py new file mode 100644 index 0000000000000..8c27b4971ec9a --- /dev/null +++ b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_source.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import pytest +import requests +from unittest.mock import patch +from airbyte_cdk import AirbyteLogger +from source_amplitude import SourceAmplitude +from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events + + +TEST_CONFIG: dict = { + "api_key": "test_api_key", + "secret_key": "test_secret_key", + "start_date": "2022-05-01T00:00:00Z" +} +TEST_INSTANCE: SourceAmplitude = SourceAmplitude() + +class MockRequest: + def __init__(self, status_code): + self.status_code = status_code + +def test_convert_auth_to_token(): + expected = "dXNlcm5hbWU6cGFzc3dvcmQ=" + actual = TEST_INSTANCE._convert_auth_to_token("username", "password") + assert actual == expected + + +@pytest.mark.parametrize( + "response, check_passed", + [ + ({"id": 123}, True), + (requests.HTTPError(), False), + ], + ids=["Success", "Fail"], +) +def test_check(response, check_passed): + with patch.object(Cohorts, 'read_records', return_value=response) as mock_method: + result = TEST_INSTANCE.check_connection(logger=AirbyteLogger, config=TEST_CONFIG) + mock_method.assert_called() + assert check_passed == result[0] + +@pytest.mark.parametrize( + "expected_stream_cls", + [ + (Cohorts), + (Annotations), + (ActiveUsers), + (AverageSessionLength), + (Events), + ], + ids=["Cohorts", "Annotations", "ActiveUsers", "AverageSessionLength", "Events"], +) +def test_streams(expected_stream_cls): + streams = TEST_INSTANCE.streams(config=TEST_CONFIG) + for stream in streams: + if expected_stream_cls in streams: + assert isinstance(stream, expected_stream_cls) diff --git a/docs/integrations/sources/amplitude.md b/docs/integrations/sources/amplitude.md index 93384fd00613d..9b82e37914adf 100644 --- a/docs/integrations/sources/amplitude.md +++ b/docs/integrations/sources/amplitude.md @@ -1,12 +1,36 @@ # Amplitude -## Overview +This page contains the setup guide and reference information for the `Amplitude` source connector. +This source can sync data for the [Amplitude API](https://developers.amplitude.com/docs/http-api-v2). -The Amplitude supports full refresh and incremental sync. +## Prerequisites -This source can sync data for the [Amplitude API](https://developers.amplitude.com/docs/http-api-v2). +Before you begin replicating the data from `Amplitude`, please follow this guide to obtain your credentials [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information). +Once you have your credentials, you now can use them in order to setup the connection in Airbyte. + +## Setup guide +### Requirements +* Amplitude API Key +* Amplitude Secret Key +* Start Date + +Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information) before you begin setup the Airbyte connection. + +### For OSS Airbyte: +1. In the left navigation bar, click **Sources**. In the top-right corner, click **+ new source**. +2. On the Set up the `source` page, enter the name for the `Amplitude` connector and select **Amplitude** from the Source type dropdown. +3. Enter your `API Key` and `Secret Key` to corresponding fields +4. Enter the `Start Date` as the statrting point for your data replication. + +### For Airbyte Cloud: -### Output schema +1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces) account. +2. In the left navigation bar, click **Sources**. In the top-right corner, click **+ new source**. +3. On the Set up the `source` page, enter the name for the `Amplitude` connector and select **Amplitude** from the Source type dropdown. +4. Enter your `API Key` and `Secret Key` to corresponding fields +5. Enter the `Start Date` as the statrting point for your data replication. + +## Supported Streams Several output streams are available from this source: @@ -18,29 +42,19 @@ Several output streams are available from this source: If there are more endpoints you'd like Airbyte to support, please [create an issue.](https://github.com/airbytehq/airbyte/issues/new/choose) -### Features +## Supported sync modes + +The `Amplitude` source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes): | Feature | Supported? | | :--- | :--- | | Full Refresh Sync | Yes | | Incremental Sync | Yes | -| SSL connection | Yes | ### Performance considerations The Amplitude connector should gracefully handle Amplitude API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully. -## Getting started - -### Requirements - -* Amplitude API Key -* Amplitude Secret Key - -### Setup guide - -Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information). - ## Changelog | Version | Date | Pull Request | Subject |