@@ -56,6 +56,7 @@ class IncrementalAmplitudeStream(AmplitudeStream, ABC):
56
56
base_params = {}
57
57
cursor_field = "date"
58
58
date_template = "%Y%m%d"
59
+ compare_date_template = None
59
60
60
61
def __init__ (self , start_date : str , ** kwargs ):
61
62
super ().__init__ (** kwargs )
@@ -96,10 +97,12 @@ def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.no
96
97
return end_date
97
98
98
99
def get_updated_state (self , current_stream_state : MutableMapping [str , Any ], latest_record : Mapping [str , Any ]) -> Mapping [str , Any ]:
99
- latest_benchmark = latest_record [self .cursor_field ]
100
- if current_stream_state .get (self .cursor_field ):
101
- return {self .cursor_field : max (latest_benchmark , current_stream_state [self .cursor_field ])}
102
- return {self .cursor_field : latest_benchmark }
100
+ # save state value in source native format
101
+ if self .compare_date_template :
102
+ latest_state = pendulum .parse (latest_record [self .cursor_field ]).strftime (self .compare_date_template )
103
+ else :
104
+ latest_state = latest_record [self .cursor_field ]
105
+ return {self .cursor_field : max (latest_state , current_stream_state .get (self .cursor_field , "" ))}
103
106
104
107
def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
105
108
parsed = urlparse .urlparse (response .url )
@@ -148,11 +151,6 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
148
151
if record [self .cursor_field ] >= state_value :
149
152
yield self ._date_time_to_rfc3339 (record ) # transform all `date-time` to RFC3339
150
153
151
- def get_updated_state (self , current_stream_state : MutableMapping [str , Any ], latest_record : Mapping [str , Any ]) -> Mapping [str , Any ]:
152
- # save state value in source native format
153
- latest_state = pendulum .parse (latest_record [self .cursor_field ]).strftime (self .compare_date_template )
154
- return {self .cursor_field : max (latest_state , current_stream_state .get (self .cursor_field , "" ))}
155
-
156
154
def _parse_zip_file (self , zip_file : IO [bytes ]) -> Iterable [Mapping ]:
157
155
with gzip .open (zip_file ) as file :
158
156
for record in file :
0 commit comments