Skip to content

refactor(source-mixpanel): migrate to CDK v3 #41969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
06f2e62
Flush buffer for each RATE_LIMITED message print
lazebnyi Jul 10, 2024
0a70afb
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 10, 2024
6d1eef7
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 11, 2024
f71dd1e
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 11, 2024
930ec65
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 12, 2024
7c2bf36
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 15, 2024
01884eb
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 15, 2024
38014eb
Bump CDK dependency version to 3.5.0
lazebnyi Jul 15, 2024
4d04cd6
Update PR number
lazebnyi Jul 15, 2024
30e0ac3
Move handlers to use it as a package
lazebnyi Jul 16, 2024
5b258d6
Remove handlers for utils
lazebnyi Jul 16, 2024
72ce2fc
Fix format
lazebnyi Jul 16, 2024
1da7bc1
Update to handle case when stream_slice is None
lazebnyi Jul 16, 2024
bd0b992
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 16, 2024
b892c64
Rollback typing
lazebnyi Jul 16, 2024
c99dd62
Rollback typing for components and utils
lazebnyi Jul 16, 2024
76e33a5
Fix typo
lazebnyi Jul 16, 2024
d49d743
Remove unusefull args
lazebnyi Jul 16, 2024
d4947aa
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 16, 2024
6b35f8c
Merge branch 'master' into lazebnyi/source-mixpanel-bump-CDK-3.5.0
lazebnyi Jul 16, 2024
fa91fb2
Bump CDK to 3.5.3
lazebnyi Jul 16, 2024
78cf2e2
Bump CDK version to 3.6.0
lazebnyi Jul 16, 2024
c0d80ee
Merge branch 'master' of github.com:airbytehq/airbyte
lazebnyi Jul 16, 2024
15819f7
Merge branch 'master' into lazebnyi/source-mixpanel-bump-CDK-3.5.0
lazebnyi Jul 16, 2024
05f3ae0
Skip rate limit balancer
lazebnyi Jul 17, 2024
dd61e17
Fix formatting
lazebnyi Jul 17, 2024
eb19d6d
Merge branch 'master' into lazebnyi/source-mixpanel-bump-CDK-3.5.0
lazebnyi Jul 18, 2024
ca41a18
Update backoff strategy for export stream
lazebnyi Jul 18, 2024
6ddf28a
Bump CDK to 3.8.1
lazebnyi Jul 18, 2024
f09a615
Bump CDK to 3.8.2
lazebnyi Jul 18, 2024
d5b26af
Merge master to branch
lazebnyi Jul 29, 2024
f0237d3
Revert test chenages and add doc strings
lazebnyi Jul 29, 2024
88f114c
Fix formatting
lazebnyi Jul 30, 2024
567e85a
Merge master to branch
lazebnyi Aug 14, 2024
7a8b999
Rollback placeholder
lazebnyi Aug 14, 2024
0373508
Bump CDK to v4
lazebnyi Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 3.2.4
dockerImageTag: 3.2.5
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel
Expand Down
515 changes: 443 additions & 72 deletions airbyte-integrations/connectors/source-mixpanel/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "3.2.4"
version = "3.2.5"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = ["Airbyte <[email protected]>"]
Expand All @@ -17,7 +17,7 @@ include = "source_mixpanel"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "^3"

[tool.poetry.scripts]
source-mixpanel = "source_mixpanel.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def _request_params(
"""
next_page_token = None # reset it, pagination data is in extra_params
if extra_params:
page = extra_params.pop("page", {})
extra_params.update(page)
return super()._request_params(stream_state, stream_slice, next_page_token, extra_params)
page = extra_params.pop("page", {}) # type: ignore[attr-defined]
extra_params.update(page) # type: ignore[attr-defined]
return super()._request_params(stream_state, stream_slice, next_page_token, extra_params) # type: ignore[no-any-return]

def send_request(self, **kwargs) -> Optional[requests.Response]:
def send_request(self, **kwargs: Any) -> Optional[requests.Response]:

if self.reqs_per_hour_limit:
if self.is_first_request:
Expand All @@ -75,7 +75,7 @@ def send_request(self, **kwargs) -> Optional[requests.Response]:
)
time.sleep(3600 / self.reqs_per_hour_limit)

return super().send_request(**kwargs)
return super().send_request(**kwargs) # type: ignore[no-any-return]


class AnnotationsHttpRequester(MixpanelHttpRequester):
Expand Down Expand Up @@ -111,10 +111,10 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
params = super().get_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
if "start_time" in stream_slice:
params["where"] = f'properties["$last_seen"] >= "{stream_slice["start_time"]}"'
if "start_time" in stream_slice: # type: ignore[operator]
params["where"] = f'properties["$last_seen"] >= "{stream_slice["start_time"]}"' # type: ignore[index]
elif "start_date" in self.config:
params["where"] = f'properties["$last_seen"] >= "{self.config["start_date"]}"'
params["where"] = f'properties["$last_seen"] >= "{self.config["start_date"]}"' # type: ignore[index]
return params


Expand All @@ -126,7 +126,7 @@ def get_request_body_json(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# https://developer.mixpanel.com/reference/engage-query
cohort_id = stream_slice["id"]
cohort_id = stream_slice["id"] # type: ignore[index]
return {"filter_by_cohort": f'{{"id":{cohort_id}}}'}


Expand Down Expand Up @@ -184,7 +184,7 @@ def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]
'status': 'ok'
}
"""
new_records = []
new_records: List[Mapping[str, Any]] = []
for record in super().extract_records(response):
for date_entry in record:
if date_entry != "$overall":
Expand Down Expand Up @@ -215,7 +215,7 @@ def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]
'status': 'ok'
}
"""
new_records = []
new_records: List[Mapping[str, Any]] = []
for record in super().extract_records(response):
for date_entry in record:
list.append(new_records, {"date": date_entry, **record[date_entry]})
Expand Down Expand Up @@ -275,14 +275,15 @@ class EngagePaginationStrategy(PageIncrement):
page - incremental page number
"""

_total = 0
_total: Optional[int] = 0

def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
"""
Determines page and subpage numbers for the `items` stream

Attributes:
response: Contains `boards` and corresponding lists of `items` for each `board`
last_page_size: the number of records read from the response
last_records: Parsed `items` from the response
"""
decoded_response = response.json()
Expand All @@ -307,8 +308,8 @@ class EngageJsonFileSchemaLoader(JsonFileSchemaLoader):

schema: Mapping[str, Any]

def __post_init__(self, parameters: Mapping[str, Any]):
if not self.file_path:
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if not self.file_path: # type: ignore[has-type]
self.file_path = _default_file_path()
self.file_path = InterpolatedString.create(self.file_path, parameters=parameters)
self.schema = {}
Expand Down Expand Up @@ -356,4 +357,4 @@ def get_json_schema(self) -> Mapping[str, Any]:
if property_name not in schema["properties"]:
schema["properties"][property_name] = types.get(property_type, {"type": ["null", "string"]})
self.schema = schema
return schema
return schema # type: ignore[no-any-return]
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

import pendulum
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk import BackoffStrategy
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from pendulum import Date
from requests.auth import AuthBase
from source_mixpanel.utils import fix_date_time
from source_mixpanel.utils import DateSlicesMixinErrorHandler, MixpanelStreamBackoffStrategy, MixpanelStreamErrorHandler, fix_date_time


class MixpanelStream(HttpStream, ABC):
Expand All @@ -34,17 +36,17 @@ def state_checkpoint_interval(self) -> int:
return 15

@property
def url_base(self):
def url_base(self) -> str:
prefix = "eu." if self.region == "EU" else ""
return f"https://{prefix}mixpanel.com/api/2.0/"

@property
def reqs_per_hour_limit(self):
def reqs_per_hour_limit(self) -> int:
# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
return self._reqs_per_hour_limit

@reqs_per_hour_limit.setter
def reqs_per_hour_limit(self, value):
def reqs_per_hour_limit(self, value: int) -> None:
self._reqs_per_hour_limit = value

def __init__(
Expand All @@ -57,9 +59,9 @@ def __init__(
date_window_size: int = 30, # in days
attribution_window: int = 0, # in days
select_properties_by_default: bool = True,
project_id: int = None,
project_id: Optional[int] = None,
reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT,
**kwargs,
**kwargs: Any,
):
self.start_date = start_date
self.end_date = end_date
Expand All @@ -78,11 +80,14 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return None

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
self,
stream_state: Mapping[str, Any],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {"Accept": "application/json"}

def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def process_response(self, response: requests.Response, **kwargs: Any) -> Iterable[Mapping[str, Any]]:
json_response = response.json()
if self.data_field is not None:
data = json_response.get(self.data_field, [])
Expand All @@ -99,8 +104,8 @@ def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
**kwargs,
) -> Iterable[Mapping]:
**kwargs: Any,
) -> Iterable[Mapping[str, Any]]:
# parse the whole response
yield from self.process_response(response, stream_state=stream_state, **kwargs)

Expand All @@ -115,37 +120,18 @@ def max_retries(self) -> Union[int, None]:
# we want to limit the max sleeping time by 2^3 * 60 = 8 minutes
return 3

def backoff_time(self, response: requests.Response) -> float:
"""
Some API endpoints do not return "Retry-After" header.
"""
def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
return MixpanelStreamBackoffStrategy(stream=self)

retry_after = response.headers.get("Retry-After")
if retry_after:
self.logger.debug(f"API responded with `Retry-After` header: {retry_after}")
return float(retry_after)

self.retries += 1
return 2**self.retries * 60

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 402:
self.logger.warning(f"Unable to perform a request. Payment Required: {response.json()['error']}")
return False
if response.status_code == 400 and "Unable to authenticate request" in response.text:
message = (
f"Your credentials might have expired. Please update your config with valid credentials."
f" See more details: {response.text}"
)
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
return super().should_retry(response)
def get_error_handler(self) -> Optional[ErrorHandler]:
return MixpanelStreamErrorHandler(logger=self.logger, max_retries=self.max_retries, error_mapping=DEFAULT_ERROR_MAPPING)

def get_stream_params(self) -> Mapping[str, Any]:
"""
Fetch required parameters in a given stream. Used to create sub-streams
"""
params = {
"authenticator": self._session.auth,
"authenticator": self._http_client._session.auth,
"region": self.region,
"project_timezone": self.project_timezone,
"reqs_per_hour_limit": self.reqs_per_hour_limit,
Expand All @@ -157,85 +143,79 @@ def get_stream_params(self) -> Mapping[str, Any]:
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
if self.project_id:
return {"project_id": str(self.project_id)}
return {}


class DateSlicesMixin:
raise_on_http_errors = True

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == requests.codes.bad_request:
if "to_date cannot be later than today" in response.text:
self._timezone_mismatch = True
self.logger.warning(
"Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. "
"Stopping current stream sync."
)
setattr(self, "raise_on_http_errors", False)
return False
return super().should_retry(response)

def __init__(self, *args, **kwargs):
def get_error_handler(self) -> Optional[ErrorHandler]:
return DateSlicesMixinErrorHandler(
logger=self.logger, max_retries=self.max_retries, error_mapping=DEFAULT_ERROR_MAPPING, stream=self # type: ignore[attr-defined]
)

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self._timezone_mismatch = False

def parse_response(self, *args, **kwargs):
def parse_response(self, *args: Any, **kwargs: Any) -> Iterable[Mapping[str, Any]]:
if self._timezone_mismatch:
return []
yield from super().parse_response(*args, **kwargs)
yield from super().parse_response(*args, **kwargs) # type: ignore[misc]

def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
# use the latest date between self.start_date and stream_state
start_date = self.start_date
start_date = self.start_date # type: ignore[attr-defined]
cursor_value = None

if stream_state and self.cursor_field and self.cursor_field in stream_state:
if stream_state and self.cursor_field and self.cursor_field in stream_state: # type: ignore[attr-defined]
# Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD')
# It also means that sync returns duplicated entries for the date from the state (date range is inclusive)
cursor_value = stream_state[self.cursor_field]
stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date()
cursor_value = stream_state[self.cursor_field] # type: ignore[attr-defined]
stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date() # type: ignore[attr-defined]
start_date = max(start_date, stream_state_date)

# move start_date back <attribution_window> days to sync data since that time as well
start_date = start_date - timedelta(days=self.attribution_window)
start_date = start_date - timedelta(days=self.attribution_window) # type: ignore[attr-defined]

# end_date cannot be later than today
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date())
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date()) # type: ignore[attr-defined]

while start_date <= end_date:
if self._timezone_mismatch:
return
current_end_date = start_date + timedelta(days=self.date_window_size - 1) # -1 is needed because dates are inclusive
current_end_date = start_date + timedelta(days=self.date_window_size - 1) # type: ignore[attr-defined] # -1 is needed because dates are inclusive
stream_slice = {
"start_date": str(start_date),
"end_date": str(min(current_end_date, end_date)),
}
if cursor_value:
stream_slice[self.cursor_field] = cursor_value
stream_slice[self.cursor_field] = cursor_value # type: ignore[attr-defined]
yield stream_slice
# add 1 additional day because date range is inclusive
start_date = current_end_date + timedelta(days=1)

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
self,
stream_state: Mapping[str, Any],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
params = super().request_params(stream_state, stream_slice, next_page_token) # type: ignore[misc]
return {
**params,
"from_date": stream_slice["start_date"],
"to_date": stream_slice["end_date"],
"from_date": stream_slice["start_date"], # type: ignore[index]
"to_date": stream_slice["end_date"], # type: ignore[index]
}


class IncrementalMixpanelStream(MixpanelStream, ABC):
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
updated_state = latest_record.get(self.cursor_field)
if updated_state:
state_value = current_stream_state.get(self.cursor_field)
Expand Down
Loading
Loading