4
4
5
5
import logging
6
6
import os
7
- from datetime import timedelta
7
+ from datetime import datetime , timedelta
8
8
from typing import Any , List , Mapping , MutableMapping , Optional , Tuple
9
9
10
10
import pendulum
14
14
from airbyte_cdk .models import ConfiguredAirbyteCatalog , FailureType
15
15
from airbyte_cdk .sources .concurrent_source .concurrent_source import ConcurrentSource
16
16
from airbyte_cdk .sources .concurrent_source .concurrent_source_adapter import ConcurrentSourceAdapter
17
+ from airbyte_cdk .sources .connector_state_manager import ConnectorStateManager
17
18
from airbyte_cdk .sources .message .repository import InMemoryMessageRepository
19
+ from airbyte_cdk .sources .source import TState
18
20
from airbyte_cdk .sources .streams import Stream
19
21
from airbyte_cdk .sources .streams .call_rate import AbstractAPIBudget , HttpAPIBudget , HttpRequestMatcher , MovingWindowCallRatePolicy , Rate
20
22
from airbyte_cdk .sources .streams .concurrent .adapters import StreamFacade
21
- from airbyte_cdk .sources .streams .concurrent .cursor import NoopCursor
23
+ from airbyte_cdk .sources .streams .concurrent .cursor import Comparable , ConcurrentCursor , CursorField , NoopCursor
24
+ from airbyte_cdk .sources .streams .concurrent .state_converters .datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter
22
25
from airbyte_cdk .sources .streams .http .auth import TokenAuthenticator
23
26
from airbyte_cdk .utils .traced_exception import AirbyteTracedException
24
27
from airbyte_protocol .models import SyncMode
49
52
class SourceStripe (ConcurrentSourceAdapter ):
50
53
51
54
message_repository = InMemoryMessageRepository (entrypoint_logger .level )
55
+ _SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION = {
56
+ Events : ("created[gte]" , "created[lte]" ),
57
+ }
52
58
53
- def __init__ (self , catalog : Optional [ConfiguredAirbyteCatalog ], config : Optional [Mapping [str , Any ]], ** kwargs ):
59
+ def __init__ (self , catalog : Optional [ConfiguredAirbyteCatalog ], config : Optional [Mapping [str , Any ]], state : TState , ** kwargs ):
54
60
if config :
55
61
concurrency_level = min (config .get ("num_workers" , _DEFAULT_CONCURRENCY ), _MAX_CONCURRENCY )
56
62
else :
@@ -60,6 +66,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
60
66
concurrency_level , concurrency_level // 2 , logger , self ._slice_logger , self .message_repository
61
67
)
62
68
super ().__init__ (concurrent_source )
69
+ self ._state = state
63
70
if catalog :
64
71
self ._streams_configured_as_full_refresh = {
65
72
configured_stream .stream .name
@@ -71,9 +78,8 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
71
78
self ._streams_configured_as_full_refresh = set ()
72
79
73
80
@staticmethod
74
- def validate_and_fill_with_defaults (config : MutableMapping ) -> MutableMapping :
75
- start_date , lookback_window_days , slice_range = (
76
- config .get ("start_date" ),
81
+ def validate_and_fill_with_defaults (config : MutableMapping [str , Any ]) -> MutableMapping [str , Any ]:
82
+ lookback_window_days , slice_range = (
77
83
config .get ("lookback_window_days" ),
78
84
config .get ("slice_range" ),
79
85
)
@@ -86,9 +92,9 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
86
92
internal_message = message ,
87
93
failure_type = FailureType .config_error ,
88
94
)
89
- if start_date :
90
- # verifies the start_date is parseable
91
- SourceStripe ._start_date_to_timestamp (start_date )
95
+
96
+ # verifies the start_date in the config is valid
97
+ SourceStripe ._start_date_to_timestamp (config )
92
98
if slice_range is None :
93
99
config ["slice_range" ] = 365
94
100
elif not isinstance (slice_range , int ) or slice_range < 1 :
@@ -100,7 +106,7 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
100
106
)
101
107
return config
102
108
103
- def check_connection (self , logger : AirbyteLogger , config : Mapping [str , Any ]) -> Tuple [bool , Any ]:
109
+ def check_connection (self , logger : AirbyteLogger , config : MutableMapping [str , Any ]) -> Tuple [bool , Any ]:
104
110
self .validate_and_fill_with_defaults (config )
105
111
stripe .api_key = config ["client_secret" ]
106
112
try :
@@ -167,14 +173,11 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:
167
173
168
174
return HttpAPIBudget (policies = policies )
169
175
170
- def streams (self , config : Mapping [str , Any ]) -> List [Stream ]:
176
+ def streams (self , config : MutableMapping [str , Any ]) -> List [Stream ]:
171
177
config = self .validate_and_fill_with_defaults (config )
172
178
authenticator = TokenAuthenticator (config ["client_secret" ])
173
179
174
- if "start_date" in config :
175
- start_timestamp = self ._start_date_to_timestamp (config ["start_date" ])
176
- else :
177
- start_timestamp = pendulum .datetime (2017 , 1 , 25 ).int_timestamp
180
+ start_timestamp = self ._start_date_to_timestamp (config )
178
181
args = {
179
182
"authenticator" : authenticator ,
180
183
"account_id" : config ["account_id" ],
@@ -511,21 +514,47 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
511
514
),
512
515
]
513
516
514
- return [
515
- StreamFacade .create_from_stream (stream , self , entrypoint_logger , self ._create_empty_state (), NoopCursor ())
516
- if stream .name in self ._streams_configured_as_full_refresh
517
- else stream
518
- for stream in streams
519
- ]
517
+ state_manager = ConnectorStateManager (stream_instance_map = {s .name : s for s in streams }, state = self ._state )
518
+ return [self ._to_concurrent (stream , self ._start_date_to_timestamp (config ), state_manager ) for stream in streams ]
519
+
520
+ def _to_concurrent (self , stream : Stream , fallback_start , state_manager : ConnectorStateManager ) -> Stream :
521
+ if os .environ .get ("SKIP_CONCURRENCY" ):
522
+ return stream
523
+ if stream .name in self ._streams_configured_as_full_refresh :
524
+ return StreamFacade .create_from_stream (stream , self , entrypoint_logger , self ._create_empty_state (), NoopCursor ())
525
+
526
+ state = state_manager .get_stream_state (stream .name , stream .namespace )
527
+ slice_boundary_fields = self ._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION .get (type (stream ))
528
+ if slice_boundary_fields :
529
+ cursor_field = CursorField (stream .cursor_field ) if isinstance (stream .cursor_field , str ) else CursorField (stream .cursor_field [0 ])
530
+ converter = EpochValueConcurrentStreamStateConverter ()
531
+ cursor = ConcurrentCursor (
532
+ stream .name ,
533
+ stream .namespace ,
534
+ state_manager .get_stream_state (stream .name , stream .namespace ),
535
+ self .message_repository ,
536
+ state_manager ,
537
+ converter ,
538
+ cursor_field ,
539
+ slice_boundary_fields ,
540
+ fallback_start ,
541
+ )
542
+ return StreamFacade .create_from_stream (stream , self , entrypoint_logger , state , cursor )
543
+
544
+ return stream
520
545
521
546
def _create_empty_state (self ) -> MutableMapping [str , Any ]:
522
547
# The state is known to be empty because concurrent CDK is currently only used for full refresh
523
548
return {}
524
549
525
550
@staticmethod
526
- def _start_date_to_timestamp (start_date : str ) -> int :
551
+ def _start_date_to_timestamp (config : Mapping [str , Any ]) -> int :
552
+ if "start_date" not in config :
553
+ return pendulum .datetime (2017 , 1 , 25 ).int_timestamp # type: ignore # pendulum not typed
554
+
555
+ start_date = config ["start_date" ]
527
556
try :
528
- return pendulum .parse (start_date ).int_timestamp
557
+ return pendulum .parse (start_date ).int_timestamp # type: ignore # pendulum not typed
529
558
except pendulum .parsing .exceptions .ParserError as e :
530
559
message = f"Invalid start date { start_date } . Please use YYYY-MM-DDTHH:MM:SSZ format."
531
560
raise AirbyteTracedException (
0 commit comments