Skip to content

refactor(source-mixpanel): migrate to CDK v3 #41969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
06f2e62
Flush buffer for each RATE_LIMITED message print
lazebnyi Jul 10, 2024
0a70afb
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 10, 2024
6d1eef7
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 11, 2024
f71dd1e
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 11, 2024
930ec65
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 12, 2024
7c2bf36
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 15, 2024
01884eb
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 15, 2024
38014eb
Bump CDK dependency version to 3.5.0
lazebnyi Jul 15, 2024
4d04cd6
Update PR number
lazebnyi Jul 15, 2024
30e0ac3
Move handlers to use it as a package
lazebnyi Jul 16, 2024
5b258d6
Remove handlers for utils
lazebnyi Jul 16, 2024
72ce2fc
Fix format
lazebnyi Jul 16, 2024
1da7bc1
Update to handle case when stream_slice is None
lazebnyi Jul 16, 2024
bd0b992
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 16, 2024
b892c64
Rollback typing
lazebnyi Jul 16, 2024
c99dd62
Rollback typing for components and utils
lazebnyi Jul 16, 2024
76e33a5
Fix typo
lazebnyi Jul 16, 2024
d49d743
Remove unusefull args
lazebnyi Jul 16, 2024
d4947aa
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 16, 2024
6b35f8c
Merge branch 'master' into lazebnyi/source-mixpanel-bump-CDK-3.5.0
lazebnyi Jul 16, 2024
fa91fb2
Bump CDK to 3.5.3
lazebnyi Jul 16, 2024
78cf2e2
Bump CDK version to 3.6.0
lazebnyi Jul 16, 2024
c0d80ee
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 16, 2024
15819f7
Merge branch 'master' into lazebnyi/source-mixpanel-bump-CDK-3.5.0
lazebnyi Jul 16, 2024
05f3ae0
Skip rate limit balancer
lazebnyi Jul 17, 2024
dd61e17
Fix formatting
lazebnyi Jul 17, 2024
eb19d6d
Merge branch 'master' into lazebnyi/source-mixpanel-bump-CDK-3.5.0
lazebnyi Jul 18, 2024
ca41a18
Update backoff strategy for export stream
lazebnyi Jul 18, 2024
6ddf28a
Bump CDK to 3.8.1
lazebnyi Jul 18, 2024
f09a615
Bump CDK to 3.8.2
lazebnyi Jul 18, 2024
d5b26af
Merge master to branch
lazebnyi Jul 29, 2024
f0237d3
Revert test chenages and add doc strings
lazebnyi Jul 29, 2024
88f114c
Fix formatting
lazebnyi Jul 30, 2024
567e85a
Merge master to branch
lazebnyi Aug 14, 2024
7a8b999
Rollback placeholder
lazebnyi Aug 14, 2024
0373508
Bump CDK to v4
lazebnyi Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 3.3.1
dockerImageTag: 3.4.0
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel
Expand Down
515 changes: 443 additions & 72 deletions airbyte-integrations/connectors/source-mixpanel/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "3.3.1"
version = "3.4.0"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = ["Airbyte <[email protected]>"]
Expand All @@ -17,7 +17,7 @@ include = "source_mixpanel"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "^3"

[tool.poetry.scripts]
source-mixpanel = "source_mixpanel.run:run"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Optional, Union

import requests
from airbyte_cdk import BackoffStrategy
from airbyte_cdk.sources.streams.http import HttpStream


class MixpanelStreamBackoffStrategy(BackoffStrategy):
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
self.stream = stream
super().__init__(**kwargs)

def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs: Any
) -> Optional[float]:
if isinstance(response_or_exception, requests.Response):
retry_after = response_or_exception.headers.get("Retry-After")
if retry_after:
self._logger.debug(f"API responded with `Retry-After` header: {retry_after}")
return float(retry_after)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class EngagePaginationStrategy(PageIncrement):

_total = 0

def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
"""
Determines page and subpage numbers for the `items` stream

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .base_errors_handler import MixpanelStreamErrorHandler
from .export_errors_handler import ExportErrorHandler

__all__ = [
"MixpanelStreamErrorHandler",
"ExportErrorHandler",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import re
from typing import Optional, Union

import requests
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, HttpStatusErrorHandler, ResponseAction
from airbyte_protocol.models import FailureType


class MixpanelStreamErrorHandler(HttpStatusErrorHandler):
"""
Custom error handler for Mixpanel stream that interprets responses and exceptions.

This handler specifically addresses:
- 400 status code with "Unable to authenticate request" message, indicating potential credential expiration.
- 402 status code, indicating a payment required error.

If the response does not match these specific cases, the handler defers to the parent class's implementation.
"""

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response):
if response_or_exception.status_code == 400 and "Unable to authenticate request" in response_or_exception.text:
message = (
f"Your credentials might have expired. Please update your config with valid credentials."
f" See more details: {response_or_exception.text}"
)
return ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message=message,
)
elif response_or_exception.status_code == 402:
message = f"Unable to perform a request. Payment Required: {response_or_exception.json()['error']}"
return ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.transient_error,
error_message=message,
)
return super().interpret_response(response_or_exception)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Optional, Union

import requests
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, HttpStatusErrorHandler, ResponseAction
from airbyte_protocol.models import FailureType

from .base_errors_handler import MixpanelStreamErrorHandler


class ExportErrorHandler(MixpanelStreamErrorHandler):
"""
Custom error handler for handling export errors specific to Mixpanel streams.

This handler addresses:
- 400 status code with "to_date cannot be later than today" message, indicating a potential timezone mismatch.
- ConnectionResetError during response parsing, indicating a need to retry the request.

If the response does not match these specific cases, the handler defers to the parent class's implementation.

Attributes:
stream (HttpStream): The HTTP stream associated with this error handler.
"""

def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
self.stream = stream
super().__init__(**kwargs)

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response):
if (
response_or_exception.status_code == requests.codes.bad_request
and "to_date cannot be later than today" in response_or_exception.text
):
self.stream._timezone_mismatch = True
message = "Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. Stopping current stream sync."
return ErrorResolution(
response_action=ResponseAction.IGNORE,
failure_type=FailureType.config_error,
error_message=message,
)
try:
# trying to parse response to avoid ConnectionResetError and retry if it occurs
self.stream.iter_dicts(response_or_exception.iter_lines(decode_unicode=True))
except ConnectionResetError:
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
)
return super().interpret_response(response_or_exception)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ definitions:

default_error_handler:
type: DefaultErrorHandler
backoff_strategies:
- type: ConstantBackoffStrategy
backoff_time_in_seconds: "{{ None }}" # We need to use default backoff logic
response_filters:
- http_codes: [400]
action: FAIL
Expand All @@ -39,10 +42,10 @@ definitions:
action: FAIL
error_message: Unable to perform a request. Payment Required.
- predicate: "{{ 'Retry-After' in headers }}"
action: RETRY
action: RATE_LIMITED
error_message: Query rate limit exceeded.
- error_message_contains: "Query rate limit exceeded"
action: RETRY
action: RATE_LIMITED
error_message: Query rate limit exceeded.
- http_codes: [500]
error_message_contains: "unknown error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

import pendulum
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk import BackoffStrategy
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from pendulum import Date
from requests.auth import AuthBase
from source_mixpanel.backoff_strategy import MixpanelStreamBackoffStrategy
from source_mixpanel.errors_handlers import MixpanelStreamErrorHandler
from source_mixpanel.utils import fix_date_time


Expand Down Expand Up @@ -69,7 +71,6 @@ def __init__(
self.region = region
self.project_timezone = project_timezone
self.project_id = project_id
self.retries = 0
self._reqs_per_hour_limit = reqs_per_hour_limit
super().__init__(authenticator=authenticator)

Expand Down Expand Up @@ -110,42 +111,18 @@ def parse_response(
self.logger.info(f"Sleep for {3600 / self.reqs_per_hour_limit} seconds to match API limitations after reading from {self.name}")
time.sleep(3600 / self.reqs_per_hour_limit)

@property
def max_retries(self) -> Union[int, None]:
# we want to limit the max sleeping time by 2^3 * 60 = 8 minutes
return 3
def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
return MixpanelStreamBackoffStrategy(stream=self)

def backoff_time(self, response: requests.Response) -> float:
"""
Some API endpoints do not return "Retry-After" header.
"""

retry_after = response.headers.get("Retry-After")
if retry_after:
self.logger.debug(f"API responded with `Retry-After` header: {retry_after}")
return float(retry_after)

self.retries += 1
return 2**self.retries * 60

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 402:
self.logger.warning(f"Unable to perform a request. Payment Required: {response.json()['error']}")
return False
if response.status_code == 400 and "Unable to authenticate request" in response.text:
message = (
f"Your credentials might have expired. Please update your config with valid credentials."
f" See more details: {response.text}"
)
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
return super().should_retry(response)
def get_error_handler(self) -> Optional[ErrorHandler]:
return MixpanelStreamErrorHandler(logger=self.logger)

def get_stream_params(self) -> Mapping[str, Any]:
"""
Fetch required parameters in a given stream. Used to create sub-streams
"""
params = {
"authenticator": self._session.auth,
"authenticator": self._http_client._session.auth,
"region": self.region,
"project_timezone": self.project_timezone,
"reqs_per_hour_limit": self.reqs_per_hour_limit,
Expand All @@ -168,18 +145,6 @@ def request_params(
class DateSlicesMixin:
raise_on_http_errors = True

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == requests.codes.bad_request:
if "to_date cannot be later than today" in response.text:
self._timezone_mismatch = True
self.logger.warning(
"Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. "
"Stopping current stream sync."
)
setattr(self, "raise_on_http_errors", False)
return False
return super().should_retry(response)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._timezone_mismatch = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

import json
from functools import cache
from typing import Any, Iterable, Mapping, MutableMapping
from typing import Any, Iterable, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from source_mixpanel.errors_handlers import ExportErrorHandler
from source_mixpanel.property_transformation import transform_property_names

from ..property_transformation import transform_property_names
from .base import DateSlicesMixin, IncrementalMixpanelStream, MixpanelStream


Expand Down Expand Up @@ -88,13 +90,8 @@ def url_base(self):
def path(self, **kwargs) -> str:
return "export"

def should_retry(self, response: requests.Response) -> bool:
try:
# trying to parse response to avoid ConnectionResetError and retry if it occurs
self.iter_dicts(response.iter_lines(decode_unicode=True))
except ConnectionResetError:
return True
return super().should_retry(response)
def get_error_handler(self) -> Optional[ErrorHandler]:
return ExportErrorHandler(logger=self.logger, stream=self)

def iter_dicts(self, lines):
"""
Expand Down
Loading
Loading