1
+ /*
2
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3
+ */
4
+
1
5
package io.airbyte.integrations.base.destination.operation
2
6
3
7
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
@@ -27,14 +31,27 @@ import org.junit.jupiter.params.provider.Arguments
27
31
import org.junit.jupiter.params.provider.MethodSource
28
32
import org.junit.jupiter.params.provider.ValueSource
29
33
34
+ /* *
35
+ * Verify that [AbstractStreamOperation] behaves correctly, given various initial states. We
36
+ * intentionally mock the [DestinationInitialStatus]. This allows us to verify that the stream ops
37
+ * only looks at specific fields - the mocked initial statuses will throw exceptions for unstubbed
38
+ * methods.
39
+ *
40
+ * For example, we don't need to write separate test cases for "final table does not exist and
41
+ * destination state has softReset=true/false" - instead we have a single test case for "final table
42
+ * does not exist", and it doesn't stub the `needsSoftReset` method. If we introduce a bug in stream
43
+ * ops and it starts checking needsSoftReset even though the final table doesn't exist, then these
44
+ * tests will start failing.
45
+ */
30
46
class AbstractStreamOperationTest {
31
47
class TestStreamOperation (
32
48
storageOperations : StorageOperations ,
33
49
destinationInitialStatus : DestinationInitialStatus <MinimumDestinationState .Impl >
34
- ) : AbstractStreamOperation<MinimumDestinationState.Impl>(
35
- storageOperations,
36
- destinationInitialStatus,
37
- ) {
50
+ ) :
51
+ AbstractStreamOperation <MinimumDestinationState .Impl >(
52
+ storageOperations,
53
+ destinationInitialStatus,
54
+ ) {
38
55
override fun writeRecords (
39
56
streamConfig : StreamConfig ,
40
57
stream : Stream <PartialAirbyteMessage >
@@ -49,18 +66,20 @@ class AbstractStreamOperationTest {
49
66
50
67
@Nested
51
68
inner class Overwrite {
52
- private val streamConfig = StreamConfig (
53
- streamId,
54
- DestinationSyncMode .OVERWRITE ,
55
- listOf (),
56
- Optional .empty(),
57
- columns,
58
- // TODO currently these values are unused. Eventually we should restructure this class
59
- // to test based on generation ID instead of sync mode.
60
- 0 ,
61
- 0 ,
62
- 0
63
- )
69
+ private val streamConfig =
70
+ StreamConfig (
71
+ streamId,
72
+ DestinationSyncMode .OVERWRITE ,
73
+ listOf (),
74
+ Optional .empty(),
75
+ columns,
76
+ // TODO currently these values are unused. Eventually we should restructure this
77
+ // class
78
+ // to test based on generation ID instead of sync mode.
79
+ 0 ,
80
+ 0 ,
81
+ 0
82
+ )
64
83
65
84
@Test
66
85
fun emptyDestination () {
@@ -204,7 +223,8 @@ class AbstractStreamOperationTest {
204
223
every { initialState.initialRawTableStatus } returns mockk<InitialRawTableStatus >()
205
224
// This is an overwrite sync, so we can ignore the old raw records.
206
225
// We should skip T+D if the current sync emitted 0 records.
207
- every { initialState.initialRawTableStatus.hasUnprocessedRecords } returns hasUnprocessedRecords
226
+ every { initialState.initialRawTableStatus.hasUnprocessedRecords } returns
227
+ hasUnprocessedRecords
208
228
every { initialState.isFinalTablePresent } returns true
209
229
every { initialState.isFinalTableEmpty } returns false
210
230
@@ -232,11 +252,14 @@ class AbstractStreamOperationTest {
232
252
@Nested
233
253
inner class NonOverwrite {
234
254
@ParameterizedTest
235
- @MethodSource(" io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" )
255
+ @MethodSource(
256
+ " io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs"
257
+ )
236
258
fun emptyDestination (streamConfig : StreamConfig ) {
237
259
val initialState = mockk<DestinationInitialStatus <MinimumDestinationState .Impl >>()
238
260
every { initialState.streamConfig } returns streamConfig
239
- every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns Optional .empty()
261
+ every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns
262
+ Optional .empty()
240
263
every { initialState.isFinalTablePresent } returns false
241
264
242
265
val streamOperations = TestStreamOperation (storageOperations, initialState)
@@ -264,11 +287,14 @@ class AbstractStreamOperationTest {
264
287
}
265
288
266
289
@ParameterizedTest
267
- @MethodSource(" io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" )
290
+ @MethodSource(
291
+ " io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs"
292
+ )
268
293
fun existingTableSchemaMismatch (streamConfig : StreamConfig ) {
269
294
val initialState = mockk<DestinationInitialStatus <MinimumDestinationState .Impl >>()
270
295
every { initialState.streamConfig } returns streamConfig
271
- every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns Optional .empty()
296
+ every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns
297
+ Optional .empty()
272
298
every { initialState.isFinalTablePresent } returns true
273
299
every { initialState.isSchemaMismatch } returns true
274
300
@@ -297,11 +323,14 @@ class AbstractStreamOperationTest {
297
323
}
298
324
299
325
@ParameterizedTest
300
- @MethodSource(" io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" )
326
+ @MethodSource(
327
+ " io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs"
328
+ )
301
329
fun existingTableSchemaMatch (streamConfig : StreamConfig ) {
302
330
val initialState = mockk<DestinationInitialStatus <MinimumDestinationState .Impl >>()
303
331
every { initialState.streamConfig } returns streamConfig
304
- every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns Optional .empty()
332
+ every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns
333
+ Optional .empty()
305
334
every { initialState.isFinalTablePresent } returns true
306
335
every { initialState.isSchemaMismatch } returns false
307
336
every { initialState.destinationState } returns MinimumDestinationState .Impl (false )
@@ -331,18 +360,63 @@ class AbstractStreamOperationTest {
331
360
}
332
361
333
362
@ParameterizedTest
334
- @MethodSource(" io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigsAndBoolean" )
335
- fun existingNonEmptyTableNoNewRecords (streamConfig : StreamConfig , hasUnprocessedRecords : Boolean ) {
363
+ @MethodSource(
364
+ " io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs"
365
+ )
366
+ fun existingTableAndStateRequiresSoftReset (streamConfig : StreamConfig ) {
367
+ val initialState = mockk<DestinationInitialStatus <MinimumDestinationState .Impl >>()
368
+ every { initialState.streamConfig } returns streamConfig
369
+ every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns
370
+ Optional .empty()
371
+ every { initialState.isFinalTablePresent } returns true
372
+ every { initialState.isSchemaMismatch } returns false
373
+ every { initialState.destinationState } returns MinimumDestinationState .Impl (true )
374
+
375
+ val streamOperations = TestStreamOperation (storageOperations, initialState)
376
+
377
+ verifySequence {
378
+ storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode)
379
+ storageOperations.createFinalSchema(streamId)
380
+ storageOperations.softResetFinalTable(streamConfig)
381
+ }
382
+ confirmVerified(storageOperations)
383
+
384
+ clearMocks(storageOperations)
385
+ streamOperations.finalizeTable(streamConfig, StreamSyncSummary (Optional .of(42 )))
386
+
387
+ verifySequence {
388
+ storageOperations.cleanupStage(streamId)
389
+ storageOperations.typeAndDedupe(
390
+ streamConfig,
391
+ Optional .empty(),
392
+ " " ,
393
+ )
394
+ }
395
+ confirmVerified(storageOperations)
396
+ checkUnnecessaryStub(initialState, initialState.initialRawTableStatus)
397
+ }
398
+
399
+ @ParameterizedTest
400
+ @MethodSource(
401
+ " io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigsAndBoolean"
402
+ )
403
+ fun existingNonEmptyTableNoNewRecords (
404
+ streamConfig : StreamConfig ,
405
+ hasUnprocessedRecords : Boolean
406
+ ) {
336
407
val initialState = mockk<DestinationInitialStatus <MinimumDestinationState .Impl >>()
337
408
every { initialState.streamConfig } returns streamConfig
338
409
// This is an overwrite sync, so we can ignore the old raw records.
339
410
// We should skip T+D if the current sync emitted 0 records.
340
- every { initialState.initialRawTableStatus.hasUnprocessedRecords } returns hasUnprocessedRecords
411
+ every { initialState.initialRawTableStatus.hasUnprocessedRecords } returns
412
+ hasUnprocessedRecords
341
413
if (hasUnprocessedRecords) {
342
414
// We only care about this value if we're executing T+D.
343
- // If there are no unprocessed records from a previous sync, and no new records from this sync,
415
+ // If there are no unprocessed records from a previous sync, and no new records from
416
+ // this sync,
344
417
// we don't need to set it.
345
- every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns maxProcessedTimestamp
418
+ every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns
419
+ maxProcessedTimestamp
346
420
}
347
421
every { initialState.isFinalTablePresent } returns true
348
422
every { initialState.isSchemaMismatch } returns false
@@ -361,7 +435,8 @@ class AbstractStreamOperationTest {
361
435
362
436
verifySequence {
363
437
storageOperations.cleanupStage(streamId)
364
- // If this sync emitted no records, we only need to run T+D if a previous sync emitted
438
+ // If this sync emitted no records, we only need to run T+D if a previous sync
439
+ // emitted
365
440
// some records but failed to run T+D.
366
441
if (hasUnprocessedRecords) {
367
442
storageOperations.typeAndDedupe(streamConfig, maxProcessedTimestamp, " " )
@@ -373,73 +448,81 @@ class AbstractStreamOperationTest {
373
448
}
374
449
375
450
companion object {
376
- val streamId = StreamId (
377
- " final_namespace" ,
378
- " final_name" ,
379
- " raw_namespace" ,
380
- " raw_name" ,
381
- " original_namespace" ,
382
- " original_name" ,
383
- )
451
+ val streamId =
452
+ StreamId (
453
+ " final_namespace" ,
454
+ " final_name" ,
455
+ " raw_namespace" ,
456
+ " raw_name" ,
457
+ " original_namespace" ,
458
+ " original_name" ,
459
+ )
384
460
private val pk1 = ColumnId (" pk1" , " pk1_original_name" , " pk1_canonical_name" )
385
461
private val pk2 = ColumnId (" pk2" , " pk2_original_name" , " pk2_canonical_name" )
386
462
private val cursor = ColumnId (" cursor" , " cursor_original_name" , " cursor_canonical_name" )
387
- val columns: LinkedHashMap <ColumnId , AirbyteType > = linkedMapOf(
388
- pk1 to AirbyteProtocolType .INTEGER ,
389
- pk2 to AirbyteProtocolType .STRING ,
390
- cursor to AirbyteProtocolType .TIMESTAMP_WITH_TIMEZONE ,
391
- ColumnId (
392
- " username" ,
393
- " username_original_name" ,
394
- " username_canonical_name" ,
395
- ) to AirbyteProtocolType .STRING ,
396
- )
463
+ val columns: LinkedHashMap <ColumnId , AirbyteType > =
464
+ linkedMapOf(
465
+ pk1 to AirbyteProtocolType .INTEGER ,
466
+ pk2 to AirbyteProtocolType .STRING ,
467
+ cursor to AirbyteProtocolType .TIMESTAMP_WITH_TIMEZONE ,
468
+ ColumnId (
469
+ " username" ,
470
+ " username_original_name" ,
471
+ " username_canonical_name" ,
472
+ ) to AirbyteProtocolType .STRING ,
473
+ )
397
474
398
475
const val EXPECTED_OVERWRITE_SUFFIX = " _airbyte_tmp"
399
476
val maxProcessedTimestamp = Optional .of(Instant .parse(" 2024-01-23T12:34:56Z" ))
400
477
401
- private val appendStreamConfig = StreamConfig (
402
- streamId,
403
- DestinationSyncMode .APPEND ,
404
- listOf (),
405
- Optional .empty(),
406
- columns,
407
- // TODO currently these values are unused. Eventually we should restructure this class
408
- // to test based on generation ID instead of sync mode.
409
- 0 ,
410
- 0 ,
411
- 0
412
- )
413
- private val dedupStreamConfig = StreamConfig (
414
- streamId,
415
- DestinationSyncMode .APPEND_DEDUP ,
416
- listOf (pk1, pk2),
417
- Optional .of(cursor),
418
- columns,
419
- // TODO currently these values are unused. Eventually we should restructure this class
420
- // to test based on generation ID instead of sync mode.
421
- 0 ,
422
- 0 ,
423
- 0
424
- )
478
+ private val appendStreamConfig =
479
+ StreamConfig (
480
+ streamId,
481
+ DestinationSyncMode .APPEND ,
482
+ listOf (),
483
+ Optional .empty(),
484
+ columns,
485
+ // TODO currently these values are unused. Eventually we should restructure this
486
+ // class
487
+ // to test based on generation ID instead of sync mode.
488
+ 0 ,
489
+ 0 ,
490
+ 0
491
+ )
492
+ private val dedupStreamConfig =
493
+ StreamConfig (
494
+ streamId,
495
+ DestinationSyncMode .APPEND_DEDUP ,
496
+ listOf (pk1, pk2),
497
+ Optional .of(cursor),
498
+ columns,
499
+ // TODO currently these values are unused. Eventually we should restructure this
500
+ // class
501
+ // to test based on generation ID instead of sync mode.
502
+ 0 ,
503
+ 0 ,
504
+ 0
505
+ )
425
506
426
507
// junit 5 doesn't support class-level parameterization...
427
508
// so we have to hack this in a somewhat dumb way.
428
509
// append and dedup should behave identically from StreamOperations' POV,
429
510
// so just shove them together.
430
511
@JvmStatic
431
- fun nonOverwriteStreamConfigs (): Stream <Arguments > = Stream .of(
432
- Arguments .of(appendStreamConfig),
433
- Arguments .of(dedupStreamConfig),
434
- )
512
+ fun nonOverwriteStreamConfigs (): Stream <Arguments > =
513
+ Stream .of(
514
+ Arguments .of(appendStreamConfig),
515
+ Arguments .of(dedupStreamConfig),
516
+ )
435
517
436
518
// Some tests are further parameterized, which this method supports.
437
519
@JvmStatic
438
- fun nonOverwriteStreamConfigsAndBoolean (): Stream <Arguments > = Stream .of(
439
- Arguments .of(appendStreamConfig, true ),
440
- Arguments .of(appendStreamConfig, false ),
441
- Arguments .of(dedupStreamConfig, true ),
442
- Arguments .of(dedupStreamConfig, false ),
443
- )
520
+ fun nonOverwriteStreamConfigsAndBoolean (): Stream <Arguments > =
521
+ Stream .of(
522
+ Arguments .of(appendStreamConfig, true ),
523
+ Arguments .of(appendStreamConfig, false ),
524
+ Arguments .of(dedupStreamConfig, true ),
525
+ Arguments .of(dedupStreamConfig, false ),
526
+ )
444
527
}
445
528
}
0 commit comments