|
9 | 9 |
|
10 | 10 | import pendulum
|
11 | 11 | import requests
|
12 |
| -from airbyte_cdk import BackoffStrategy |
| 12 | +from airbyte_cdk import BackoffStrategy, StreamSlice |
13 | 13 | from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
|
14 | 14 | from airbyte_cdk.models import Type as MessageType
|
15 | 15 | from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
| 16 | +from airbyte_cdk.sources.streams.checkpoint.substream_resumable_full_refresh_cursor import SubstreamResumableFullRefreshCursor |
16 | 17 | from airbyte_cdk.sources.streams.core import CheckpointMixin, Stream
|
17 | 18 | from airbyte_cdk.sources.streams.http import HttpStream
|
18 | 19 | from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, ErrorResolution, HttpStatusErrorHandler, ResponseAction
|
@@ -57,6 +58,9 @@ def __init__(self, api_url: str = "https://api.github.com", access_token_type: s
|
57 | 58 | self.api_url = api_url
|
58 | 59 | self.state = {}
|
59 | 60 |
|
| 61 | + if not self.supports_incremental: |
| 62 | + self.cursor = SubstreamResumableFullRefreshCursor() |
| 63 | + |
60 | 64 | @property
|
61 | 65 | def url_base(self) -> str:
|
62 | 66 | return self.api_url
|
@@ -1613,7 +1617,8 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
|
1613 | 1617 | return record
|
1614 | 1618 |
|
1615 | 1619 | def get_error_handler(self) -> Optional[ErrorHandler]:
|
1616 |
| - return ContributorActivityErrorHandler(logger=self.logger, max_retries=self.max_retries, error_mapping=GITHUB_DEFAULT_ERROR_MAPPING) |
| 1620 | + |
| 1621 | + return ContributorActivityErrorHandler(logger=self.logger, max_retries=5, error_mapping=GITHUB_DEFAULT_ERROR_MAPPING) |
1617 | 1622 |
|
1618 | 1623 | def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
|
1619 | 1624 | return ContributorActivityBackoffStrategy()
|
@@ -1645,6 +1650,13 @@ def read_records(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iter
|
1645 | 1650 | message=f"Syncing `{self.__class__.__name__}` " f"stream isn't available for repository `{repository}`.",
|
1646 | 1651 | ),
|
1647 | 1652 | )
|
| 1653 | + |
| 1654 | + # In order to retain the existing stream behavior before we added RFR to this stream, we need to close out the |
| 1655 | + # partition after we give up the maximum number of retries on the 202 response. This does lead to the question |
| 1656 | + # of if we should prematurely exit in the first place, but for now we're going to aim for feature parity |
| 1657 | + partition_obj = stream_slice.get("partition") |
| 1658 | + if self.cursor and partition_obj: |
| 1659 | + self.cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition_obj)) |
1648 | 1660 | else:
|
1649 | 1661 | raise e
|
1650 | 1662 |
|
|
0 commit comments