2
2
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3
3
#
4
4
5
+ from datetime import timedelta
5
6
from unittest .mock import MagicMock
6
7
8
+ import pendulum
7
9
import pytest
8
10
from airbyte_cdk import AirbyteLogger
9
11
from airbyte_cdk .models import SyncMode
@@ -157,24 +159,27 @@ def test_engage_stream_incremental(requests_mock, engage_response):
157
159
158
160
stream = Engage (authenticator = MagicMock ())
159
161
160
- records = stream .read_records (sync_mode = SyncMode .incremental , cursor_field = ["created" ], stream_state = {"created" : "2008-12-12T11:20:47" })
162
+ stream_state = {"created" : "2008-12-12T11:20:47" }
163
+ records = stream .read_records (sync_mode = SyncMode .incremental , cursor_field = ["created" ], stream_state = stream_state )
161
164
162
- records_length = sum (1 for _ in records )
163
- assert records_length == 1
165
+ records = [item for item in records ]
166
+ assert len (records ) == 1
167
+ assert stream .get_updated_state (current_stream_state = stream_state , latest_record = records [- 1 ]) == {"created" : "2008-12-12T11:20:47" }
164
168
165
169
166
170
def test_cohort_members_stream_incremental (requests_mock , engage_response , cohorts_response ):
167
171
requests_mock .register_uri ("POST" , MIXPANEL_BASE_URL + "engage?page_size=1000" , engage_response )
168
172
requests_mock .register_uri ("GET" , MIXPANEL_BASE_URL + "cohorts/list" , cohorts_response )
169
173
170
174
stream = CohortMembers (authenticator = MagicMock ())
171
-
175
+ stream_state = { "created" : "2008-12-12T11:20:47" }
172
176
records = stream .read_records (
173
- sync_mode = SyncMode .incremental , cursor_field = ["created" ], stream_state = { "created" : "2008-12-12T11:20:47" } , stream_slice = {"id" : 1000 }
177
+ sync_mode = SyncMode .incremental , cursor_field = ["created" ], stream_state = stream_state , stream_slice = {"id" : 1000 }
174
178
)
175
179
176
- records_length = sum (1 for _ in records )
177
- assert records_length == 1
180
+ records = [item for item in records ]
181
+ assert len (records ) == 1
182
+ assert stream .get_updated_state (current_stream_state = stream_state , latest_record = records [- 1 ]) == {"created" : "2008-12-12T11:20:47" }
178
183
179
184
180
185
@pytest .fixture
@@ -199,13 +204,15 @@ def funnels_list_url(config):
199
204
200
205
201
206
@pytest .fixture
202
- def funnels_response ():
207
+ def funnels_response (start_date ):
208
+ first_date = start_date + timedelta (days = 1 )
209
+ second_date = start_date + timedelta (days = 10 )
203
210
return setup_response (
204
211
200 ,
205
212
{
206
- "meta" : {"dates" : ["2016-09-12" "2016-09-19" "2016-09-26" ]},
213
+ "meta" : {"dates" : [str ( first_date ), str ( second_date ) ]},
207
214
"data" : {
208
- "2016-09-12" : {
215
+ str ( first_date ) : {
209
216
"steps" : [],
210
217
"analysis" : {
211
218
"completion" : 20524 ,
@@ -214,7 +221,7 @@ def funnels_response():
214
221
"worst" : 1 ,
215
222
},
216
223
},
217
- "2016-09-19" : {
224
+ str ( second_date ) : {
218
225
"steps" : [],
219
226
"analysis" : {
220
227
"completion" : 20500 ,
@@ -242,6 +249,25 @@ def test_funnels_stream(requests_mock, config, funnels_response, funnels_list_re
242
249
records_arr .append (record )
243
250
244
251
assert len (records_arr ) == 4
252
+ last_record = records_arr [- 1 ]
253
+ # Test without current state date
254
+ new_state = stream .get_updated_state (current_stream_state = {}, latest_record = records_arr [- 1 ])
255
+ assert new_state == {str (last_record ["funnel_id" ]): {"date" : last_record ["date" ]}}
256
+
257
+ # Test with current state, that lesser than last record date
258
+ last_record_date = pendulum .parse (last_record ["date" ]).date ()
259
+ new_state = stream .get_updated_state (
260
+ current_stream_state = {str (last_record ["funnel_id" ]): {"date" : str (last_record_date - timedelta (days = 1 ))}},
261
+ latest_record = records_arr [- 1 ],
262
+ )
263
+ assert new_state == {str (last_record ["funnel_id" ]): {"date" : last_record ["date" ]}}
264
+
265
+ # Test with current state, that is greater, than last record date
266
+ new_state = stream .get_updated_state (
267
+ current_stream_state = {str (last_record ["funnel_id" ]): {"date" : str (last_record_date + timedelta (days = 1 ))}},
268
+ latest_record = records_arr [- 1 ],
269
+ )
270
+ assert new_state == {str (last_record ["funnel_id" ]): {"date" : str (last_record_date + timedelta (days = 1 ))}}
245
271
246
272
247
273
@pytest .fixture
@@ -269,6 +295,25 @@ def test_engage_schema(requests_mock, engage_schema_response):
269
295
assert records_length == 3
270
296
271
297
298
+ def test_update_engage_schema (requests_mock ):
299
+ stream = EngageSchema (authenticator = MagicMock ())
300
+ requests_mock .register_uri (
301
+ "GET" ,
302
+ get_url_to_mock (stream ),
303
+ setup_response (
304
+ 200 ,
305
+ {
306
+ "results" : {
307
+ "$someNewSchemaField" : {"count" : 124 , "type" : "string" },
308
+ }
309
+ },
310
+ ),
311
+ )
312
+ engage_stream = Engage (authenticator = MagicMock ())
313
+ engage_schema = engage_stream .get_json_schema ()
314
+ assert "someNewSchemaField" in engage_schema ["properties" ]
315
+
316
+
272
317
@pytest .fixture
273
318
def annotations_response ():
274
319
return setup_response (
0 commit comments