Skip to content

Commit a7a9688

Browse files
authored
refactor(source-mixpanel): migrate to CDK v3 (#41969)
1 parent 42f7d56 commit a7a9688

File tree

12 files changed

+832
-187
lines changed

12 files changed

+832
-187
lines changed

airbyte-integrations/connectors/source-mixpanel/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: api
1212
connectorType: source
1313
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
14-
dockerImageTag: 3.3.3
14+
dockerImageTag: 3.4.0
1515
dockerRepository: airbyte/source-mixpanel
1616
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
1717
githubIssueLabel: source-mixpanel

airbyte-integrations/connectors/source-mixpanel/poetry.lock

+627-76
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-mixpanel/pyproject.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "3.3.3"
6+
version = "3.4.0"
77
name = "source-mixpanel"
88
description = "Source implementation for Mixpanel."
99
authors = ["Airbyte <[email protected]>"]
@@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
1616
include = "source_mixpanel"
1717

1818
[tool.poetry.dependencies]
19-
python = "^3.9,<3.12"
20-
airbyte-cdk = "0.80.0"
19+
python = "^3.10,<3.12"
20+
airbyte-cdk = "^4"
2121

2222
[tool.poetry.scripts]
2323
source-mixpanel = "source_mixpanel.run:run"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, Optional, Union
6+
7+
import requests
8+
from airbyte_cdk import BackoffStrategy
9+
from airbyte_cdk.sources.streams.http import HttpStream
10+
11+
12+
class MixpanelStreamBackoffStrategy(BackoffStrategy):
13+
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
14+
self.stream = stream
15+
super().__init__(**kwargs)
16+
17+
def backoff_time(
18+
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs: Any
19+
) -> Optional[float]:
20+
if isinstance(response_or_exception, requests.Response):
21+
retry_after = response_or_exception.headers.get("Retry-After")
22+
if retry_after:
23+
self._logger.debug(f"API responded with `Retry-After` header: {retry_after}")
24+
return float(retry_after)
25+
return None

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/components.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ class EngagePaginationStrategy(PageIncrement):
277277

278278
_total = 0
279279

280-
def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
280+
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
281281
"""
282282
Determines page and subpage numbers for the `items` stream
283283
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .base_errors_handler import MixpanelStreamErrorHandler
2+
from .export_errors_handler import ExportErrorHandler
3+
4+
__all__ = [
5+
"MixpanelStreamErrorHandler",
6+
"ExportErrorHandler",
7+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import re
6+
from typing import Optional, Union
7+
8+
import requests
9+
from airbyte_cdk.sources.streams.http import HttpStream
10+
from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, HttpStatusErrorHandler, ResponseAction
11+
from airbyte_protocol.models import FailureType
12+
13+
14+
class MixpanelStreamErrorHandler(HttpStatusErrorHandler):
15+
"""
16+
Custom error handler for Mixpanel stream that interprets responses and exceptions.
17+
18+
This handler specifically addresses:
19+
- 400 status code with "Unable to authenticate request" message, indicating potential credential expiration.
20+
- 402 status code, indicating a payment required error.
21+
22+
If the response does not match these specific cases, the handler defers to the parent class's implementation.
23+
"""
24+
25+
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
26+
if isinstance(response_or_exception, requests.Response):
27+
if response_or_exception.status_code == 400 and "Unable to authenticate request" in response_or_exception.text:
28+
message = (
29+
f"Your credentials might have expired. Please update your config with valid credentials."
30+
f" See more details: {response_or_exception.text}"
31+
)
32+
return ErrorResolution(
33+
response_action=ResponseAction.FAIL,
34+
failure_type=FailureType.config_error,
35+
error_message=message,
36+
)
37+
elif response_or_exception.status_code == 402:
38+
message = f"Unable to perform a request. Payment Required: {response_or_exception.json()['error']}"
39+
return ErrorResolution(
40+
response_action=ResponseAction.FAIL,
41+
failure_type=FailureType.transient_error,
42+
error_message=message,
43+
)
44+
return super().interpret_response(response_or_exception)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Optional, Union
6+
7+
import requests
8+
from airbyte_cdk.sources.streams.http import HttpStream
9+
from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, HttpStatusErrorHandler, ResponseAction
10+
from airbyte_protocol.models import FailureType
11+
12+
from .base_errors_handler import MixpanelStreamErrorHandler
13+
14+
15+
class ExportErrorHandler(MixpanelStreamErrorHandler):
16+
"""
17+
Custom error handler for handling export errors specific to Mixpanel streams.
18+
19+
This handler addresses:
20+
- 400 status code with "to_date cannot be later than today" message, indicating a potential timezone mismatch.
21+
- ConnectionResetError during response parsing, indicating a need to retry the request.
22+
23+
If the response does not match these specific cases, the handler defers to the parent class's implementation.
24+
25+
Attributes:
26+
stream (HttpStream): The HTTP stream associated with this error handler.
27+
"""
28+
29+
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
30+
self.stream = stream
31+
super().__init__(**kwargs)
32+
33+
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
34+
if isinstance(response_or_exception, requests.Response):
35+
if (
36+
response_or_exception.status_code == requests.codes.bad_request
37+
and "to_date cannot be later than today" in response_or_exception.text
38+
):
39+
self.stream._timezone_mismatch = True
40+
message = "Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. Stopping current stream sync."
41+
return ErrorResolution(
42+
response_action=ResponseAction.IGNORE,
43+
failure_type=FailureType.config_error,
44+
error_message=message,
45+
)
46+
try:
47+
# trying to parse response to avoid ConnectionResetError and retry if it occurs
48+
self.stream.iter_dicts(response_or_exception.iter_lines(decode_unicode=True))
49+
except ConnectionResetError:
50+
return ErrorResolution(
51+
response_action=ResponseAction.RETRY,
52+
failure_type=FailureType.transient_error,
53+
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
54+
)
55+
return super().interpret_response(response_or_exception)

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ definitions:
3939
action: FAIL
4040
error_message: Unable to perform a request. Payment Required.
4141
- predicate: "{{ 'Retry-After' in headers }}"
42-
action: RETRY
42+
action: RATE_LIMITED
4343
error_message: Query rate limit exceeded.
4444
- error_message_contains: "Query rate limit exceeded"
45-
action: RETRY
45+
action: RATE_LIMITED
4646
error_message: Query rate limit exceeded.
4747
- http_codes: [500]
4848
error_message_contains: "unknown error"

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py

+9-44
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99

1010
import pendulum
1111
import requests
12-
from airbyte_cdk.models import FailureType
12+
from airbyte_cdk import BackoffStrategy
1313
from airbyte_cdk.sources.streams.http import HttpStream
14-
from airbyte_cdk.utils import AirbyteTracedException
14+
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
1515
from pendulum import Date
1616
from requests.auth import AuthBase
17+
from source_mixpanel.backoff_strategy import MixpanelStreamBackoffStrategy
18+
from source_mixpanel.errors_handlers import MixpanelStreamErrorHandler
1719
from source_mixpanel.utils import fix_date_time
1820

1921

@@ -69,7 +71,6 @@ def __init__(
6971
self.region = region
7072
self.project_timezone = project_timezone
7173
self.project_id = project_id
72-
self.retries = 0
7374
self._reqs_per_hour_limit = reqs_per_hour_limit
7475
super().__init__(authenticator=authenticator)
7576

@@ -110,42 +111,18 @@ def parse_response(
110111
self.logger.info(f"Sleep for {3600 / self.reqs_per_hour_limit} seconds to match API limitations after reading from {self.name}")
111112
time.sleep(3600 / self.reqs_per_hour_limit)
112113

113-
@property
114-
def max_retries(self) -> Union[int, None]:
115-
# we want to limit the max sleeping time by 2^3 * 60 = 8 minutes
116-
return 3
114+
def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
115+
return MixpanelStreamBackoffStrategy(stream=self)
117116

118-
def backoff_time(self, response: requests.Response) -> float:
119-
"""
120-
Some API endpoints do not return "Retry-After" header.
121-
"""
122-
123-
retry_after = response.headers.get("Retry-After")
124-
if retry_after:
125-
self.logger.debug(f"API responded with `Retry-After` header: {retry_after}")
126-
return float(retry_after)
127-
128-
self.retries += 1
129-
return 2**self.retries * 60
130-
131-
def should_retry(self, response: requests.Response) -> bool:
132-
if response.status_code == 402:
133-
self.logger.warning(f"Unable to perform a request. Payment Required: {response.json()['error']}")
134-
return False
135-
if response.status_code == 400 and "Unable to authenticate request" in response.text:
136-
message = (
137-
f"Your credentials might have expired. Please update your config with valid credentials."
138-
f" See more details: {response.text}"
139-
)
140-
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
141-
return super().should_retry(response)
117+
def get_error_handler(self) -> Optional[ErrorHandler]:
118+
return MixpanelStreamErrorHandler(logger=self.logger)
142119

143120
def get_stream_params(self) -> Mapping[str, Any]:
144121
"""
145122
Fetch required parameters in a given stream. Used to create sub-streams
146123
"""
147124
params = {
148-
"authenticator": self._session.auth,
125+
"authenticator": self._http_client._session.auth,
149126
"region": self.region,
150127
"project_timezone": self.project_timezone,
151128
"reqs_per_hour_limit": self.reqs_per_hour_limit,
@@ -168,18 +145,6 @@ def request_params(
168145
class DateSlicesMixin:
169146
raise_on_http_errors = True
170147

171-
def should_retry(self, response: requests.Response) -> bool:
172-
if response.status_code == requests.codes.bad_request:
173-
if "to_date cannot be later than today" in response.text:
174-
self._timezone_mismatch = True
175-
self.logger.warning(
176-
"Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. "
177-
"Stopping current stream sync."
178-
)
179-
setattr(self, "raise_on_http_errors", False)
180-
return False
181-
return super().should_retry(response)
182-
183148
def __init__(self, *args, **kwargs):
184149
super().__init__(*args, **kwargs)
185150
self._timezone_mismatch = False

airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py

+6-9
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44

55
import json
66
from functools import cache
7-
from typing import Any, Iterable, Mapping, MutableMapping
7+
from typing import Any, Iterable, Mapping, MutableMapping, Optional
88

99
import pendulum
1010
import requests
1111
from airbyte_cdk.models import SyncMode
12+
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
1213
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
14+
from source_mixpanel.errors_handlers import ExportErrorHandler
15+
from source_mixpanel.property_transformation import transform_property_names
1316

14-
from ..property_transformation import transform_property_names
1517
from .base import DateSlicesMixin, IncrementalMixpanelStream, MixpanelStream
1618

1719

@@ -88,13 +90,8 @@ def url_base(self):
8890
def path(self, **kwargs) -> str:
8991
return "export"
9092

91-
def should_retry(self, response: requests.Response) -> bool:
92-
try:
93-
# trying to parse response to avoid ConnectionResetError and retry if it occurs
94-
self.iter_dicts(response.iter_lines(decode_unicode=True))
95-
except ConnectionResetError:
96-
return True
97-
return super().should_retry(response)
93+
def get_error_handler(self) -> Optional[ErrorHandler]:
94+
return ExportErrorHandler(logger=self.logger, stream=self)
9895

9996
def iter_dicts(self, lines):
10097
"""

0 commit comments

Comments
 (0)