diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index ecd980b862288..20f3a6e44717b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -760,7 +760,7 @@ - name: Mixpanel sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a dockerRepository: airbyte/source-mixpanel - dockerImageTag: 0.1.28 + dockerImageTag: 0.1.29 documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel icon: mixpanel.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 7e9a42bd61132..723d8ed258f6d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7182,7 +7182,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-mixpanel:0.1.28" +- dockerImage: "airbyte/source-mixpanel:0.1.29" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/mixpanel" connectionSpecification: @@ -7191,7 +7191,7 @@ type: "object" properties: credentials: - title: "Authentication" + title: "Authentication *" description: "Choose how to authenticate to Mixpanel" type: "object" order: 0 diff --git a/airbyte-integrations/connectors/source-mixpanel/Dockerfile b/airbyte-integrations/connectors/source-mixpanel/Dockerfile index 3d23ac5d7499c..cd44abfc08fe8 100644 --- a/airbyte-integrations/connectors/source-mixpanel/Dockerfile +++ b/airbyte-integrations/connectors/source-mixpanel/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.28 +LABEL io.airbyte.version=0.1.29 LABEL io.airbyte.name=airbyte/source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/setup.py b/airbyte-integrations/connectors/source-mixpanel/setup.py index 2e8c999f12f76..8853942e605d6 100644 --- a/airbyte-integrations/connectors/source-mixpanel/setup.py +++ b/airbyte-integrations/connectors/source-mixpanel/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1", + "airbyte-cdk~=0.2", ] TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "pytest-mock~=3.6", "requests_mock~=1.8"] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index 50bccb9f59464..abef69db1e9f2 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -85,6 +85,32 @@ def url_base(self): def path(self, **kwargs) -> str: return "export" + def iter_dicts(self, lines): + """ + The incoming stream has to be JSON lines format. + From time to time for some reason, the one record can be split into multiple lines. + We try to combine such split parts into one record only if parts go nearby. + """ + parts = [] + for record_line in lines: + if record_line == "terminated early": + self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}") + return + try: + yield json.loads(record_line) + except ValueError: + parts.append(record_line) + else: + parts = [] + + if len(parts) > 1: + try: + yield json.loads("".join(parts)) + except ValueError: + pass + else: + parts = [] + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """Export API return response in JSONL format but each line is a valid JSON object Raw item example: @@ -106,11 +132,7 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma """ # We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks - for record_line in response.iter_lines(decode_unicode=True): - if record_line == "terminated early": - self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}") - break - record = json.loads(record_line) + for record in self.iter_dicts(response.iter_lines(decode_unicode=True)): # transform record into flat dict structure item = {"event": record["event"]} properties = record["properties"] diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 0f5c365aa9bb5..cab84ff7f6528 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json from datetime import timedelta from unittest.mock import MagicMock @@ -456,3 +457,14 @@ def test_export_terminated_early(requests_mock, config): stream = Export(authenticator=MagicMock(), **config) requests_mock.register_uri("GET", get_url_to_mock(stream), text="terminated early\n") assert list(read_full_refresh(stream)) == [] + + +def test_export_iter_dicts(config): + stream = Export(authenticator=MagicMock(), **config) + record = {"key1": "value1", "key2": "value2"} + record_string = json.dumps(record) + assert list(stream.iter_dicts([record_string, record_string])) == [record, record] + # combine record from 2 standing nearby parts + assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record] + # drop record parts because they are not standing nearby + assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record] diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index fed97ee1ff0f9..df8be46d181ef 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -50,7 +50,8 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------------------------------------------------- | -| 0.1.28 | 2022-10-06 | [17699](https://github.com/airbytehq/airbyte/pull/17699) | Fix discover step issue cursor field None | +| 0.1.29 | 2022-11-02 | [18846](https://github.com/airbytehq/airbyte/pull/18846) | For "export" stream make line parsing more robust | +| 0.1.28 | 2022-10-06 | [17699](https://github.com/airbytehq/airbyte/pull/17699) | Fix discover step issue cursor field None | | 0.1.27 | 2022-09-29 | [17415](https://github.com/airbytehq/airbyte/pull/17415) | Disable stream "cohort_members" on discover if not access | | 0.1.26 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream states. | | 0.1.25 | 2022-09-27 | [17145](https://github.com/airbytehq/airbyte/pull/17145) | Disable streams "export", "engage" on discover if not access |