Skip to content

🎉 Source mixpanel: add streaming for export stream :) #16843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@
- name: Mixpanel
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.22
dockerImageTag: 0.1.23
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
icon: mixpanel.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6171,7 +6171,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mixpanel:0.1.22"
- dockerImage: "airbyte/source-mixpanel:0.1.23"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.22
LABEL io.airbyte.version=0.1.23
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,12 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
}
}
"""
if response.text == "terminated early\n":
# no data available
self.logger.warn(f"Couldn't fetch data from Export API. Response: {response.text}")
return []

# We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks
for record_line in response.iter_lines():
for record_line in response.iter_lines(decode_unicode=True):
if record_line == "terminated early":
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
break
record = json.loads(record_line)
# transform record into flat dict structure
item = {"event": record["event"]}
Expand Down Expand Up @@ -157,3 +156,8 @@ def request_params(
if stream_state and "date" in stream_state:
mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})"
return mapping

def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"stream": True}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
Revenue,
)

from .utils import get_url_to_mock, setup_response
from .utils import get_url_to_mock, read_full_refresh, setup_response

logger = AirbyteLogger()

Expand Down Expand Up @@ -450,3 +450,9 @@ def test_export_stream_request_params(config):
assert "where" in request_params
timestamp = int(datetime.datetime.fromisoformat("2021-06-16T17:00:00").timestamp())
assert request_params.get("where") == f'properties["$time"]>=datetime({timestamp})'


def test_export_terminated_early(requests_mock, config):
stream = Export(authenticator=MagicMock(), **config)
requests_mock.register_uri("GET", get_url_to_mock(stream), text="terminated early\n")
assert list(read_full_refresh(stream)) == []
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,21 @@

import urllib.parse

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream


def setup_response(status, body):
return [{"json": body, "status_code": status}]


def get_url_to_mock(stream):
return urllib.parse.urljoin(stream.url_base, stream.path())


def read_full_refresh(stream_instance: Stream):
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for record in records:
yield record
1 change: 1 addition & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------|
| 0.1.23 | 2022-09-18 | [16843](https://github.com/airbytehq/airbyte/pull/16843) | Add stream=True for `export` stream |
| 0.1.22 | 2022-09-15 | [16770](https://github.com/airbytehq/airbyte/pull/16770) | Use "Retry-After" header for backoff |
| 0.1.21 | 2022-09-11 | [16191](https://github.com/airbytehq/airbyte/pull/16191) | Improved connector's input configuration validation |
| 0.1.20 | 2022-08-22 | [15091](https://github.com/airbytehq/airbyte/pull/15091) | Improve `export` stream cursor support |
Expand Down