Skip to content

🎉 Source Amplitude: increase unit_tests coverage up to 90% #12479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,5 @@
#


import pytest

pytest_plugins = ("source_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies
def test_dummy_test():
assert True
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-amplitude/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"airbyte-cdk~=0.1",
]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1", "requests-mock"]

setup(
name="source_amplitude",
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import pendulum
import pytest
import requests
from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events


class MockRequest:
def __init__(self, status_code):
self.status_code = status_code


class TestFullRefreshStreams:
@pytest.mark.parametrize(
"stream_cls, data, expected",
[
(Cohorts, [{"key": "value"}], [{"key": "value"}]),
(Annotations, [{"key1": "value1"}], [{"key1": "value1"}]),
],
ids=["Cohorts", "Annotations"],
)
def test_parse_response(self, requests_mock, stream_cls, data, expected):
stream = stream_cls()
url = f"{stream.url_base}{stream.path()}"
data = {stream.data_field: data}
requests_mock.get(url, json=data)
response = requests.get(url)
assert list(stream.parse_response(response)) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(Cohorts, None),
(Annotations, None),
],
ids=["Cohorts", "Annotations"],
)
def test_next_page_token(self, requests_mock, stream_cls, expected):
stream = stream_cls()
url = f"{stream.url_base}{stream.path()}"
requests_mock.get(url, json={})
response = requests.get(url)
assert stream.next_page_token(response) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(Cohorts, "3/cohorts"),
(Annotations, "2/annotations"),
],
ids=["Cohorts", "Annotations"],
)
def test_path(self, stream_cls, expected):
stream = stream_cls()
assert stream.path() == expected


class TestIncrementalStreams:
@pytest.mark.parametrize(
"stream_cls, data, expected",
[
(
ActiveUsers,
{
"xValues": ["2021-01-01", "2021-01-02"],
"series": [[1, 5]],
"seriesCollapsed": [[0]],
"seriesLabels": [0],
"seriesMeta": [{"segmentIndex": 0}],
},
[{"date": "2021-01-01", "statistics": {0: 1}}, {"date": "2021-01-02", "statistics": {0: 5}}],
),
(
AverageSessionLength,
{
"xValues": ["2019-05-23", "2019-05-24"],
"series": [[2, 6]],
"seriesCollapsed": [[0]],
"seriesLabels": [0],
"seriesMeta": [{"segmentIndex": 0}],
},
[{"date": "2019-05-23", "length": 2}, {"date": "2019-05-24", "length": 6}],
),
],
ids=["ActiveUsers", "AverageSessionLength"],
)
def test_parse_response(self, requests_mock, stream_cls, data, expected):
stream = stream_cls("2021-01-01T00:00:00Z")
url = f"{stream.url_base}{stream.path()}"
data = {stream.data_field: data}
requests_mock.get(url, json=data)
response = requests.get(url)
result = list(stream.parse_response(response))
assert result == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, "2/users"),
(AverageSessionLength, "2/sessions/average"),
(Events, "2/export"),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_path(self, stream_cls, expected):
stream = stream_cls(pendulum.now().isoformat())
assert stream.path() == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, {"m": "active", "i": 1, "g": "country"}),
(AverageSessionLength, {}),
],
ids=["ActiveUsers", "AverageSessionLength"],
)
def test_request_params(self, stream_cls, expected):
now = pendulum.now()
stream = stream_cls(now.isoformat())
# update expected with valid start,end dates
expected.update(**{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)})
assert stream.request_params({}) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, {}),
(AverageSessionLength, {}),
(Events, {}),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_next_page_token(self, requests_mock, stream_cls, expected):
days_ago = pendulum.now().subtract(days=2)
stream = stream_cls(days_ago.isoformat())
start = days_ago.strftime(stream.date_template)
end = pendulum.yesterday().strftime(stream.date_template)
url = f"{stream.url_base}{stream.path()}?start={start}&end={end}"
# update expected with test values.
expected.update(
**{"start": pendulum.yesterday().strftime(stream.date_template), "end": pendulum.now().strftime(stream.date_template)}
)
requests_mock.get(url)
response = requests.get(url)
assert stream.next_page_token(response) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, ""),
(AverageSessionLength, ""),
(Events, ""),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_get_end_date(self, stream_cls, expected):
now = pendulum.now()
yesterday = pendulum.yesterday()
stream = stream_cls(yesterday.isoformat())
# update expected with test values.
expected = now.strftime(stream.date_template)
assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected

class TestEventsStream:
def test_parse_zip(self):
stream = Events(pendulum.now().isoformat())
expected = [{"id": 123}]
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
assert expected == result

def test_stream_slices(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now()
expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}]
assert expected == stream.stream_slices()

def test_request_params(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now().subtract(hours=6)
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
assert slice == stream.request_params(slice)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import pytest
import requests
from unittest.mock import patch
from airbyte_cdk import AirbyteLogger
from source_amplitude import SourceAmplitude
from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events


TEST_CONFIG: dict = {
"api_key": "test_api_key",
"secret_key": "test_secret_key",
"start_date": "2022-05-01T00:00:00Z"
}
TEST_INSTANCE: SourceAmplitude = SourceAmplitude()

class MockRequest:
def __init__(self, status_code):
self.status_code = status_code

def test_convert_auth_to_token():
expected = "dXNlcm5hbWU6cGFzc3dvcmQ="
actual = TEST_INSTANCE._convert_auth_to_token("username", "password")
assert actual == expected


@pytest.mark.parametrize(
"response, check_passed",
[
({"id": 123}, True),
(requests.HTTPError(), False),
],
ids=["Success", "Fail"],
)
def test_check(response, check_passed):
with patch.object(Cohorts, 'read_records', return_value=response) as mock_method:
result = TEST_INSTANCE.check_connection(logger=AirbyteLogger, config=TEST_CONFIG)
mock_method.assert_called()
assert check_passed == result[0]

@pytest.mark.parametrize(
"expected_stream_cls",
[
(Cohorts),
(Annotations),
(ActiveUsers),
(AverageSessionLength),
(Events),
],
ids=["Cohorts", "Annotations", "ActiveUsers", "AverageSessionLength", "Events"],
)
def test_streams(expected_stream_cls):
streams = TEST_INSTANCE.streams(config=TEST_CONFIG)
for stream in streams:
if expected_stream_cls in streams:
assert isinstance(stream, expected_stream_cls)