Skip to content

Source Mixpanel: "export" stream make line parsing more robust #18846

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.name=airbyte/source-mixpanel
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-cdk~=0.2",
]

TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "pytest-mock~=3.6", "requests_mock~=1.8"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,32 @@ def url_base(self):
def path(self, **kwargs) -> str:
return "export"

def iter_dicts(self, lines):
"""
The incoming stream has to be JSON lines format.
From time to time for some reason, the one record can be split into multiple lines.
We try to combine such split parts into one record only if parts go nearby.
"""
parts = []
for record_line in lines:
if record_line == "terminated early":
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
return
try:
yield json.loads(record_line)
except ValueError:
parts.append(record_line)
else:
parts = []

if len(parts) > 1:
try:
yield json.loads("".join(parts))
except ValueError:
pass
else:
parts = []

def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""Export API return response in JSONL format but each line is a valid JSON object
Raw item example:
Expand All @@ -106,11 +132,7 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
"""

# We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks
for record_line in response.iter_lines(decode_unicode=True):
if record_line == "terminated early":
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
break
record = json.loads(record_line)
for record in self.iter_dicts(response.iter_lines(decode_unicode=True)):
# transform record into flat dict structure
item = {"event": record["event"]}
properties = record["properties"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
from datetime import timedelta
from unittest.mock import MagicMock

Expand Down Expand Up @@ -456,3 +457,14 @@ def test_export_terminated_early(requests_mock, config):
stream = Export(authenticator=MagicMock(), **config)
requests_mock.register_uri("GET", get_url_to_mock(stream), text="terminated early\n")
assert list(read_full_refresh(stream)) == []


def test_export_iter_dicts(config):
stream = Export(authenticator=MagicMock(), **config)
record = {"key1": "value1", "key2": "value2"}
record_string = json.dumps(record)
assert list(stream.iter_dicts([record_string, record_string])) == [record, record]
# combine record from 2 standing nearby parts
assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record]
# drop record parts because they are not standing nearby
assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you do it using pytest.mark.parametrize so that the parameters would be more human readable?

3 changes: 2 additions & 1 deletion docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------------------------------------------------- |
| 0.1.28 | 2022-10-06 | [17699](https://github.com/airbytehq/airbyte/pull/17699) | Fix discover step issue cursor field None |
| 0.1.29 | 2022-11-02 | [18846](https://github.com/airbytehq/airbyte/pull/18846) | For "export" stream make line parsing more robust |
| 0.1.28 | 2022-10-06 | [17699](https://github.com/airbytehq/airbyte/pull/17699) | Fix discover step issue cursor field None |
| 0.1.27 | 2022-09-29 | [17415](https://github.com/airbytehq/airbyte/pull/17415) | Disable stream "cohort_members" on discover if not access |
| 0.1.26 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream states. |
| 0.1.25 | 2022-09-27 | [17145](https://github.com/airbytehq/airbyte/pull/17145) | Disable streams "export", "engage" on discover if not access |
Expand Down