Skip to content

Commit a8c5656

Browse files
grubberrrobbinhan
authored andcommitted
🎉 Source mixpanel: add streaming for export stream :) (airbytehq#16843)
Signed-off-by: Sergey Chvalyuk <[email protected]>
1 parent 07bf9dc commit a8c5656

File tree

7 files changed

+31
-9
lines changed

7 files changed

+31
-9
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@
623623
- name: Mixpanel
624624
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
625625
dockerRepository: airbyte/source-mixpanel
626-
dockerImageTag: 0.1.22
626+
dockerImageTag: 0.1.23
627627
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
628628
icon: mixpanel.svg
629629
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6171,7 +6171,7 @@
61716171
path_in_connector_config:
61726172
- "credentials"
61736173
- "client_secret"
6174-
- dockerImage: "airbyte/source-mixpanel:0.1.22"
6174+
- dockerImage: "airbyte/source-mixpanel:0.1.23"
61756175
spec:
61766176
documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel"
61776177
connectionSpecification:

airbyte-integrations/connectors/source-mixpanel/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

1515

16-
LABEL io.airbyte.version=0.1.22
16+
LABEL io.airbyte.version=0.1.23
1717
LABEL io.airbyte.name=airbyte/source-mixpanel

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,12 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
103103
}
104104
}
105105
"""
106-
if response.text == "terminated early\n":
107-
# no data available
108-
self.logger.warn(f"Couldn't fetch data from Export API. Response: {response.text}")
109-
return []
110106

111107
# We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks
112-
for record_line in response.iter_lines():
108+
for record_line in response.iter_lines(decode_unicode=True):
109+
if record_line == "terminated early":
110+
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
111+
break
113112
record = json.loads(record_line)
114113
# transform record into flat dict structure
115114
item = {"event": record["event"]}
@@ -157,3 +156,8 @@ def request_params(
157156
if stream_state and "date" in stream_state:
158157
mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})"
159158
return mapping
159+
160+
def request_kwargs(
161+
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
162+
) -> Mapping[str, Any]:
163+
return {"stream": True}

airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
Revenue,
2626
)
2727

28-
from .utils import get_url_to_mock, setup_response
28+
from .utils import get_url_to_mock, read_full_refresh, setup_response
2929

3030
logger = AirbyteLogger()
3131

@@ -450,3 +450,9 @@ def test_export_stream_request_params(config):
450450
assert "where" in request_params
451451
timestamp = int(datetime.datetime.fromisoformat("2021-06-16T17:00:00").timestamp())
452452
assert request_params.get("where") == f'properties["$time"]>=datetime({timestamp})'
453+
454+
455+
def test_export_terminated_early(requests_mock, config):
456+
stream = Export(authenticator=MagicMock(), **config)
457+
requests_mock.register_uri("GET", get_url_to_mock(stream), text="terminated early\n")
458+
assert list(read_full_refresh(stream)) == []

airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,21 @@
44

55
import urllib.parse
66

7+
from airbyte_cdk.models import SyncMode
8+
from airbyte_cdk.sources.streams import Stream
9+
710

811
def setup_response(status, body):
912
return [{"json": body, "status_code": status}]
1013

1114

1215
def get_url_to_mock(stream):
1316
return urllib.parse.urljoin(stream.url_base, stream.path())
17+
18+
19+
def read_full_refresh(stream_instance: Stream):
20+
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
21+
for _slice in slices:
22+
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
23+
for record in records:
24+
yield record

docs/integrations/sources/mixpanel.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t
6161

6262
| Version | Date | Pull Request | Subject |
6363
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------|
64+
| 0.1.23 | 2022-09-18 | [16843](https://github.com/airbytehq/airbyte/pull/16843) | Add stream=True for `export` stream |
6465
| 0.1.22 | 2022-09-15 | [16770](https://github.com/airbytehq/airbyte/pull/16770) | Use "Retry-After" header for backoff |
6566
| 0.1.21 | 2022-09-11 | [16191](https://github.com/airbytehq/airbyte/pull/16191) | Improved connector's input configuration validation |
6667
| 0.1.20 | 2022-08-22 | [15091](https://github.com/airbytehq/airbyte/pull/15091) | Improve `export` stream cursor support |

0 commit comments

Comments
 (0)