From 46d957750aaa5568c5bbec0b1ef358b40701afbe Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Sat, 21 May 2022 21:26:12 +0300 Subject: [PATCH 1/3] fixed --- .../connectors/source-amplitude/Dockerfile | 2 +- .../source-amplitude/source_amplitude/api.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-amplitude/Dockerfile b/airbyte-integrations/connectors/source-amplitude/Dockerfile index cef81e7daf796..047cbc6e5f399 100644 --- a/airbyte-integrations/connectors/source-amplitude/Dockerfile +++ b/airbyte-integrations/connectors/source-amplitude/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-amplitude diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index 0d2908608a5e3..e32aaddd9c00f 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -117,11 +117,14 @@ class Events(IncrementalAmplitudeStream): state_checkpoint_interval = 1000 time_interval = {"days": 3} - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]: + state = stream_state[self.cursor_field] if stream_state else self._start_date zip_file = zipfile.ZipFile(io.BytesIO(response.content)) for gzip_filename in zip_file.namelist(): with zip_file.open(gzip_filename) as file: - yield from self._parse_zip_file(file) + for record in self._parse_zip_file(file): + if record[self.cursor_field] >= state: + yield record def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]: with gzip.open(zip_file) as file: @@ -130,9 +133,7 @@ def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]: def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: slices = [] - start = self._start_date - if stream_state: - start = pendulum.parse(stream_state.get(self.cursor_field)) + start = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self._start_date end = pendulum.now() while start <= end: slices.append( @@ -152,8 +153,7 @@ def read_records( stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: stream_state = stream_state or {} - # API returns data only when requested with a difference between 'start' and 'end' of 6 or more hours. - start = pendulum.parse(stream_slice["start"]).add(hours=6) + start = pendulum.parse(stream_slice["start"]) end = pendulum.parse(stream_slice["end"]) if start > end: yield from [] @@ -163,7 +163,7 @@ def read_records( # https://developers.amplitude.com/docs/export-api#status-codes try: - self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%d')} - {end.strftime('%Y-%m-%d')}") + self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}") yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state) except requests.exceptions.HTTPError as error: status = error.response.status_code From 9265e31e4f3c80d5145daf1e35698bce60558081 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Sat, 21 May 2022 21:58:53 +0300 Subject: [PATCH 2/3] added changelog --- .../connectors/source-amplitude/source_amplitude/api.py | 5 +++-- docs/integrations/sources/amplitude.md | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index e32aaddd9c00f..961cf638cf14a 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -113,17 +113,18 @@ def request_params( class Events(IncrementalAmplitudeStream): cursor_field = "event_time" date_template = "%Y%m%dT%H" + compare_date_template = '%Y-%m-%d %H:%M:%S' primary_key = "uuid" state_checkpoint_interval = 1000 time_interval = {"days": 3} def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]: - state = stream_state[self.cursor_field] if stream_state else self._start_date + state_value = stream_state[self.cursor_field] if stream_state else self._start_date.strftime(self.compare_date_template) zip_file = zipfile.ZipFile(io.BytesIO(response.content)) for gzip_filename in zip_file.namelist(): with zip_file.open(gzip_filename) as file: for record in self._parse_zip_file(file): - if record[self.cursor_field] >= state: + if record[self.cursor_field] >= state_value: yield record def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]: diff --git a/docs/integrations/sources/amplitude.md b/docs/integrations/sources/amplitude.md index 6334de4ead0d7..ee153d8c78afd 100644 --- a/docs/integrations/sources/amplitude.md +++ b/docs/integrations/sources/amplitude.md @@ -59,6 +59,7 @@ The Amplitude connector should gracefully handle Amplitude API limitations under | Version | Date | Pull Request | Subject | |:--------| :--------- | :----------------------------------------------------- | :------ | +| 0.1.6 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records | | 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy | | 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error | | 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications | From 727a7ec8adc1997b683070c414941ce97ce9e93f Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 23 May 2022 15:43:27 +0000 Subject: [PATCH 3/3] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-amplitude/source_amplitude/api.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 2a85be90bf6ad..7dee1c83de30d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -40,7 +40,7 @@ - name: Amplitude sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396 dockerRepository: airbyte/source-amplitude - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://docs.airbyte.io/integrations/sources/amplitude icon: amplitude.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e759d939df731..cbd3729d6c528 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -480,7 +480,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amplitude:0.1.6" +- dockerImage: "airbyte/source-amplitude:0.1.7" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index 961cf638cf14a..0c1ba40d50047 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -113,7 +113,7 @@ def request_params( class Events(IncrementalAmplitudeStream): cursor_field = "event_time" date_template = "%Y%m%dT%H" - compare_date_template = '%Y-%m-%d %H:%M:%S' + compare_date_template = "%Y-%m-%d %H:%M:%S" primary_key = "uuid" state_checkpoint_interval = 1000 time_interval = {"days": 3}