16
16
from airbyte_cdk .sources .streams import Stream
17
17
from airbyte_cdk .sources .streams .core import IncrementalMixin , package_name_from_class
18
18
from airbyte_cdk .sources .streams .http import HttpStream
19
- from airbyte_cdk .sources .streams .http .auth import Oauth2Authenticator
20
- from airbyte_cdk .sources .streams .http .exceptions import DefaultBackoffException
19
+ from airbyte_cdk .sources .streams .http .requests_native_auth import Oauth2Authenticator
21
20
from airbyte_cdk .sources .utils .schema_helpers import ResourceSchemaLoader
22
21
23
22
# https://marketingapi.snapchat.com/docs/#core-metrics
@@ -250,7 +249,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
250
249
self .max_state = self .initial_state
251
250
252
251
parent_stream = self .parent (
253
- authenticator = self .authenticator ,
252
+ authenticator = self ._session . auth ,
254
253
start_date = self .start_date ,
255
254
end_date = self .end_date ,
256
255
action_report_time = self .action_report_time ,
@@ -379,7 +378,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
379
378
"""Each stream slice represents each entity id from parent stream"""
380
379
381
380
parent_stream = self .parent (
382
- authenticator = self .authenticator ,
381
+ authenticator = self ._session . auth ,
383
382
start_date = self .start_date ,
384
383
end_date = self .end_date ,
385
384
action_report_time = self .action_report_time ,
@@ -757,44 +756,13 @@ class CampaignsStatsLifetime(Lifetime, Stats):
757
756
parent = Campaigns
758
757
759
758
760
- class SnapchatOauth2Authenticator (Oauth2Authenticator ):
761
- @backoff .on_exception (
762
- backoff .expo ,
763
- DefaultBackoffException ,
764
- on_backoff = lambda details : logger .info (
765
- f"Caught retryable error after { details ['tries' ]} tries. Waiting { details ['wait' ]} seconds then retrying..."
766
- ),
767
- max_time = 300 ,
768
- )
769
- def refresh_access_token (self ) -> Tuple [str , int ]:
770
- """
771
- returns a tuple of (access_token, token_lifespan_in_seconds)
772
- """
773
- try :
774
- response = requests .request (
775
- method = "POST" ,
776
- url = self .token_refresh_endpoint ,
777
- data = self .get_refresh_request_body (),
778
- headers = self .get_refresh_access_token_headers (),
779
- )
780
- response .raise_for_status ()
781
- response_json = response .json ()
782
- return response_json ["access_token" ], response_json ["expires_in" ]
783
- except requests .exceptions .RequestException as e :
784
- if e .response .status_code == 429 or e .response .status_code >= 500 :
785
- raise DefaultBackoffException (request = e .response .request , response = e .response )
786
- raise
787
- except Exception as e :
788
- raise Exception (f"Error while refreshing access token: { e } " ) from e
789
-
790
-
791
759
# Source
792
760
class SourceSnapchatMarketing (AbstractSource ):
793
761
"""Source Snapchat Marketing helps to retrieve the different Ad data from Snapchat business account"""
794
762
795
763
def check_connection (self , logger , config ) -> Tuple [bool , any ]:
796
764
try :
797
- auth = SnapchatOauth2Authenticator (
765
+ auth = Oauth2Authenticator (
798
766
token_refresh_endpoint = "https://accounts.snapchat.com/login/oauth2/access_token" ,
799
767
client_id = config ["client_id" ],
800
768
client_secret = config ["client_secret" ],
@@ -820,7 +788,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
820
788
# 2. when timezone is not specified, default account's timezone will be used automatically
821
789
default_end_date = pendulum .now ().subtract (days = DELAYED_DAYS ).to_date_string ()
822
790
kwargs = {
823
- "authenticator" : SnapchatOauth2Authenticator (
791
+ "authenticator" : Oauth2Authenticator (
824
792
token_refresh_endpoint = "https://accounts.snapchat.com/login/oauth2/access_token" ,
825
793
client_id = config ["client_id" ],
826
794
client_secret = config ["client_secret" ],
0 commit comments