Skip to content

🎉 Source Amplitude: increase unit_tests coverage up to 90% #12479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,5 @@
#


import pytest

pytest_plugins = ("source_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies
def test_dummy_test():
assert True
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-amplitude/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"airbyte-cdk~=0.1",
]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1", "requests-mock"]

setup(
name="source_amplitude",
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import pendulum
import pytest
import requests
from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events


class MockRequest:
def __init__(self, status_code):
self.status_code = status_code


class TestFullRefreshStreams:
@pytest.mark.parametrize(
"stream_cls, data, expected",
[
(Cohorts, [{"key": "value"}], [{"key": "value"}]),
(Annotations, [{"key1": "value1"}], [{"key1": "value1"}]),
],
ids=["Cohorts", "Annotations"],
)
def test_parse_response(self, requests_mock, stream_cls, data, expected):
stream = stream_cls()
url = f"{stream.url_base}{stream.path()}"
data = {stream.data_field: data}
requests_mock.get(url, json=data)
response = requests.get(url)
assert list(stream.parse_response(response)) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(Cohorts, None),
(Annotations, None),
],
ids=["Cohorts", "Annotations"],
)
def test_next_page_token(self, requests_mock, stream_cls, expected):
stream = stream_cls()
url = f"{stream.url_base}{stream.path()}"
requests_mock.get(url, json={})
response = requests.get(url)
assert stream.next_page_token(response) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(Cohorts, "3/cohorts"),
(Annotations, "2/annotations"),
],
ids=["Cohorts", "Annotations"],
)
def test_path(self, stream_cls, expected):
stream = stream_cls()
assert stream.path() == expected


class TestIncrementalStreams:
@pytest.mark.parametrize(
"stream_cls, data, expected",
[
(
ActiveUsers,
{
"xValues": ["2021-01-01", "2021-01-02"],
"series": [[1, 5]],
"seriesCollapsed": [[0]],
"seriesLabels": [0],
"seriesMeta": [{"segmentIndex": 0}],
},
[{"date": "2021-01-01", "statistics": {0: 1}}, {"date": "2021-01-02", "statistics": {0: 5}}],
),
(
AverageSessionLength,
{
"xValues": ["2019-05-23", "2019-05-24"],
"series": [[2, 6]],
"seriesCollapsed": [[0]],
"seriesLabels": [0],
"seriesMeta": [{"segmentIndex": 0}],
},
[{"date": "2019-05-23", "length": 2}, {"date": "2019-05-24", "length": 6}],
),
],
ids=["ActiveUsers", "AverageSessionLength"],
)
def test_parse_response(self, requests_mock, stream_cls, data, expected):
stream = stream_cls("2021-01-01T00:00:00Z")
url = f"{stream.url_base}{stream.path()}"
data = {stream.data_field: data}
requests_mock.get(url, json=data)
response = requests.get(url)
result = list(stream.parse_response(response))
assert result == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, "2/users"),
(AverageSessionLength, "2/sessions/average"),
(Events, "2/export"),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_path(self, stream_cls, expected):
stream = stream_cls(pendulum.now().isoformat())
assert stream.path() == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, {"m": "active", "i": 1, "g": "country"}),
(AverageSessionLength, {}),
],
ids=["ActiveUsers", "AverageSessionLength"],
)
def test_request_params(self, stream_cls, expected):
now = pendulum.now()
stream = stream_cls(now.isoformat())
# update expected with valid start,end dates
expected.update(**{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)})
assert stream.request_params({}) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, {}),
(AverageSessionLength, {}),
(Events, {}),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_next_page_token(self, requests_mock, stream_cls, expected):
days_ago = pendulum.now().subtract(days=2)
stream = stream_cls(days_ago.isoformat())
start = days_ago.strftime(stream.date_template)
end = pendulum.yesterday().strftime(stream.date_template)
url = f"{stream.url_base}{stream.path()}?start={start}&end={end}"
# update expected with test values.
expected.update(
**{"start": pendulum.yesterday().strftime(stream.date_template), "end": pendulum.now().strftime(stream.date_template)}
)
requests_mock.get(url)
response = requests.get(url)
assert stream.next_page_token(response) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, ""),
(AverageSessionLength, ""),
(Events, ""),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_get_end_date(self, stream_cls, expected):
now = pendulum.now()
yesterday = pendulum.yesterday()
stream = stream_cls(yesterday.isoformat())
# update expected with test values.
expected = now.strftime(stream.date_template)
assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected

class TestEventsStream:
def test_parse_zip(self):
stream = Events(pendulum.now().isoformat())
expected = [{"id": 123}]
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
assert expected == result

def test_stream_slices(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now()
expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}]
assert expected == stream.stream_slices()

def test_request_params(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now().subtract(hours=6)
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
assert slice == stream.request_params(slice)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import pytest
import requests
from unittest.mock import patch
from airbyte_cdk import AirbyteLogger
from source_amplitude import SourceAmplitude
from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events


TEST_CONFIG: dict = {
"api_key": "test_api_key",
"secret_key": "test_secret_key",
"start_date": "2022-05-01T00:00:00Z"
}
TEST_INSTANCE: SourceAmplitude = SourceAmplitude()

class MockRequest:
def __init__(self, status_code):
self.status_code = status_code

def test_convert_auth_to_token():
expected = "dXNlcm5hbWU6cGFzc3dvcmQ="
actual = TEST_INSTANCE._convert_auth_to_token("username", "password")
assert actual == expected


@pytest.mark.parametrize(
"response, check_passed",
[
({"id": 123}, True),
(requests.HTTPError(), False),
],
ids=["Success", "Fail"],
)
def test_check(response, check_passed):
with patch.object(Cohorts, 'read_records', return_value=response) as mock_method:
result = TEST_INSTANCE.check_connection(logger=AirbyteLogger, config=TEST_CONFIG)
mock_method.assert_called()
assert check_passed == result[0]

@pytest.mark.parametrize(
"expected_stream_cls",
[
(Cohorts),
(Annotations),
(ActiveUsers),
(AverageSessionLength),
(Events),
],
ids=["Cohorts", "Annotations", "ActiveUsers", "AverageSessionLength", "Events"],
)
def test_streams(expected_stream_cls):
streams = TEST_INSTANCE.streams(config=TEST_CONFIG)
for stream in streams:
if expected_stream_cls in streams:
assert isinstance(stream, expected_stream_cls)
48 changes: 31 additions & 17 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
# Amplitude

## Overview
This page contains the setup guide and reference information for the `Amplitude` source connector.
This source can sync data for the [Amplitude API](https://developers.amplitude.com/docs/http-api-v2).

The Amplitude supports full refresh and incremental sync.
## Prerequisites

This source can sync data for the [Amplitude API](https://developers.amplitude.com/docs/http-api-v2).
Before you begin replicating the data from `Amplitude`, please follow this guide to obtain your credentials [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information).
Once you have your credentials, you now can use them in order to setup the connection in Airbyte.

## Setup guide
### Requirements
* Amplitude API Key
* Amplitude Secret Key
* Start Date

Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information) before you begin setup the Airbyte connection.

### For OSS Airbyte:
1. In the left navigation bar, click **Sources**. In the top-right corner, click **+ new source**.
2. On the Set up the `source` page, enter the name for the `Amplitude` connector and select **Amplitude** from the Source type dropdown.
3. Enter your `API Key` and `Secret Key` to corresponding fields
4. Enter the `Start Date` as the statrting point for your data replication.

### For Airbyte Cloud:

### Output schema
1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces) account.
2. In the left navigation bar, click **Sources**. In the top-right corner, click **+ new source**.
3. On the Set up the `source` page, enter the name for the `Amplitude` connector and select **Amplitude** from the Source type dropdown.
4. Enter your `API Key` and `Secret Key` to corresponding fields
5. Enter the `Start Date` as the statrting point for your data replication.

## Supported Streams

Several output streams are available from this source:

Expand All @@ -18,29 +42,19 @@ Several output streams are available from this source:

If there are more endpoints you'd like Airbyte to support, please [create an issue.](https://github.com/airbytehq/airbyte/issues/new/choose)

### Features
## Supported sync modes

The `Amplitude` source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes):

| Feature | Supported? |
| :--- | :--- |
| Full Refresh Sync | Yes |
| Incremental Sync | Yes |
| SSL connection | Yes |

### Performance considerations

The Amplitude connector should gracefully handle Amplitude API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully.

## Getting started

### Requirements

* Amplitude API Key
* Amplitude Secret Key

### Setup guide
<!-- markdown-link-check-disable-next-line -->
Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information).

## Changelog

| Version | Date | Pull Request | Subject |
Expand Down