Skip to content

Commit 94dc191

Browse files
authored
🐛 Source Amplitude: Fixed JSON Validator date-time validation (#13373)
1 parent 4fe65c6 commit 94dc191

File tree

8 files changed

+91
-32
lines changed

8 files changed

+91
-32
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
- name: Amplitude
4141
sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
4242
dockerRepository: airbyte/source-amplitude
43-
dockerImageTag: 0.1.7
43+
dockerImageTag: 0.1.8
4444
documentationUrl: https://docs.airbyte.io/integrations/sources/amplitude
4545
icon: amplitude.svg
4646
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@
547547
supportsNormalization: false
548548
supportsDBT: false
549549
supported_destination_sync_modes: []
550-
- dockerImage: "airbyte/source-amplitude:0.1.7"
550+
- dockerImage: "airbyte/source-amplitude:0.1.8"
551551
spec:
552552
documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude"
553553
connectionSpecification:

airbyte-integrations/connectors/source-amplitude/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.1.7
15+
LABEL io.airbyte.version=0.1.8
1616
LABEL io.airbyte.name=airbyte/source-amplitude

airbyte-integrations/connectors/source-amplitude/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
9999
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
100100
To run your integration tests with acceptance tests, from the connector root, run
101101
```
102-
python -m pytest integration_tests -p integration_tests.acceptance
102+
docker build . --no-cache -t airbyte/source-amplitude:dev \
103+
&& python -m pytest -p source_acceptance_test.plugin
103104
```
104105
To run your integration tests with docker
105106

airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py

+30-8
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class IncrementalAmplitudeStream(AmplitudeStream, ABC):
5656
base_params = {}
5757
cursor_field = "date"
5858
date_template = "%Y%m%d"
59+
compare_date_template = None
5960

6061
def __init__(self, start_date: str, **kwargs):
6162
super().__init__(**kwargs)
@@ -69,16 +70,39 @@ def time_interval(self) -> dict:
6970
"""
7071
pass
7172

73+
def _get_date_time_items_from_schema(self):
74+
"""
75+
Get all properties from schema with format: 'date-time'
76+
"""
77+
result = []
78+
schema = self.get_json_schema()["properties"]
79+
for key, value in schema.items():
80+
if value.get("format") == "date-time":
81+
result.append(key)
82+
return result
83+
84+
def _date_time_to_rfc3339(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
85+
"""
86+
Transform 'date-time' items to RFC3339 format
87+
"""
88+
date_time_fields = self._get_date_time_items_from_schema()
89+
for item in record:
90+
if item in date_time_fields:
91+
record[item] = pendulum.parse(record[item]).to_rfc3339_string()
92+
return record
93+
7294
def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.now()):
7395
if current_date.add(**self.time_interval).date() < end_date.date():
7496
end_date = current_date.add(**self.time_interval)
7597
return end_date
7698

7799
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
78-
latest_benchmark = latest_record[self.cursor_field]
79-
if current_stream_state.get(self.cursor_field):
80-
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
81-
return {self.cursor_field: latest_benchmark}
100+
# save state value in source native format
101+
if self.compare_date_template:
102+
latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template)
103+
else:
104+
latest_state = latest_record[self.cursor_field]
105+
return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))}
82106

83107
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
84108
parsed = urlparse.urlparse(response.url)
@@ -113,7 +137,7 @@ def request_params(
113137
class Events(IncrementalAmplitudeStream):
114138
cursor_field = "event_time"
115139
date_template = "%Y%m%dT%H"
116-
compare_date_template = "%Y-%m-%d %H:%M:%S"
140+
compare_date_template = "%Y-%m-%d %H:%M:%S.%f"
117141
primary_key = "uuid"
118142
state_checkpoint_interval = 1000
119143
time_interval = {"days": 3}
@@ -125,7 +149,7 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
125149
with zip_file.open(gzip_filename) as file:
126150
for record in self._parse_zip_file(file):
127151
if record[self.cursor_field] >= state_value:
128-
yield record
152+
yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339
129153

130154
def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]:
131155
with gzip.open(zip_file) as file:
@@ -158,11 +182,9 @@ def read_records(
158182
end = pendulum.parse(stream_slice["end"])
159183
if start > end:
160184
yield from []
161-
162185
# sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it.
163186
# for example, if there is no data from the specified time period, a 404 exception is thrown
164187
# https://developers.amplitude.com/docs/export-api#status-codes
165-
166188
try:
167189
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}")
168190
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)

airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/annotations.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"properties": {
55
"date": {
66
"type": ["null", "string"],
7-
"format": "date-time"
7+
"format": "date"
88
},
99
"details": {
1010
"type": ["null", "string"]

airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py

+53-18
Original file line numberDiff line numberDiff line change
@@ -165,21 +165,56 @@ def test_get_end_date(self, stream_cls, expected):
165165
expected = now.strftime(stream.date_template)
166166
assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected
167167

168-
class TestEventsStream:
169-
def test_parse_zip(self):
170-
stream = Events(pendulum.now().isoformat())
171-
expected = [{"id": 123}]
172-
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
173-
assert expected == result
174-
175-
def test_stream_slices(self):
176-
stream = Events(pendulum.now().isoformat())
177-
now = pendulum.now()
178-
expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}]
179-
assert expected == stream.stream_slices()
180-
181-
def test_request_params(self):
182-
stream = Events(pendulum.now().isoformat())
183-
now = pendulum.now().subtract(hours=6)
184-
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
185-
assert slice == stream.request_params(slice)
168+
169+
class TestEventsStream:
170+
def test_parse_zip(self):
171+
stream = Events(pendulum.now().isoformat())
172+
expected = [{"id": 123}]
173+
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
174+
assert expected == result
175+
176+
def test_stream_slices(self):
177+
stream = Events(pendulum.now().isoformat())
178+
now = pendulum.now()
179+
expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}]
180+
assert expected == stream.stream_slices()
181+
182+
def test_request_params(self):
183+
stream = Events(pendulum.now().isoformat())
184+
now = pendulum.now().subtract(hours=6)
185+
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
186+
assert slice == stream.request_params(slice)
187+
188+
def test_get_updated_state(self):
189+
stream = Events(pendulum.now().isoformat())
190+
current_state = {"event_time": ""}
191+
latest_record = {"event_time": "2021-05-27 11:59:53.710000"}
192+
result = stream.get_updated_state(current_state, latest_record)
193+
assert result == latest_record
194+
195+
def test_get_date_time_items_from_schema(self):
196+
stream = Events(pendulum.now().isoformat())
197+
expected = [
198+
"server_received_time",
199+
"event_time",
200+
"processed_time",
201+
"user_creation_time",
202+
"client_upload_time",
203+
"server_upload_time",
204+
"client_event_time",
205+
]
206+
result = stream._get_date_time_items_from_schema()
207+
assert result == expected
208+
209+
@pytest.mark.parametrize(
210+
"record, expected",
211+
[
212+
({}, {}),
213+
({"event_time": "2021-05-27 11:59:53.710000"}, {"event_time": "2021-05-27T11:59:53.710000+00:00"}),
214+
],
215+
ids=["empty_record", "transformed_record"],
216+
)
217+
def test_date_time_to_rfc3339(self, record, expected):
218+
stream = Events(pendulum.now().isoformat())
219+
result = stream._date_time_to_rfc3339(record)
220+
assert result == expected

docs/integrations/sources/amplitude.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ The Amplitude connector should gracefully handle Amplitude API limitations under
5959

6060
| Version | Date | Pull Request | Subject |
6161
|:--------| :--------- | :----------------------------------------------------- | :------ |
62-
| 0.1.6 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
62+
| 0.1.8 | 2022-06-01 | [13373](https://github.com/airbytehq/airbyte/pull/13373) | Fixed the issue when JSON Validator produces erros on `date-time` check |
63+
| 0.1.7 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
6364
| 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
6465
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
6566
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |

0 commit comments

Comments
 (0)