Skip to content

Commit 23a01c7

Browse files
committed
even more test
1 parent a29acba commit 23a01c7

File tree

1 file changed

+129
-0
lines changed
  • airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping

1 file changed

+129
-0
lines changed

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt

+129
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,135 @@ abstract class BaseTypingDedupingTest {
12041204
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
12051205
}
12061206

1207+
/**
1208+
* Emulates this sequence of events:
1209+
* 1. User runs a normal incremental sync
1210+
* 2. User initiates a truncate refresh, but it fails.
1211+
* 3. User cancels the truncate refresh, and initiates a normal incremental sync.
1212+
*
1213+
* In particular, we must retain all records from both the first sync, _and_ the truncate sync's
1214+
* temporary raw table.
1215+
*/
1216+
@Test
1217+
@Throws(Exception::class)
1218+
open fun resumeAfterCancelledTruncate() {
1219+
val catalog1 =
1220+
io.airbyte.protocol.models.v0
1221+
.ConfiguredAirbyteCatalog()
1222+
.withStreams(
1223+
java.util.List.of(
1224+
ConfiguredAirbyteStream()
1225+
.withSyncId(42)
1226+
.withGenerationId(43)
1227+
.withMinimumGenerationId(0)
1228+
.withSyncMode(SyncMode.INCREMENTAL)
1229+
.withCursorField(listOf("updated_at"))
1230+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
1231+
.withStream(
1232+
AirbyteStream()
1233+
.withNamespace(streamNamespace)
1234+
.withName(streamName)
1235+
.withJsonSchema(SCHEMA)
1236+
)
1237+
)
1238+
)
1239+
1240+
// Normal sync
1241+
runSync(catalog1, readMessages("dat/sync1_messages.jsonl"))
1242+
1243+
val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
1244+
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
1245+
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())
1246+
1247+
val catalog2 =
1248+
io.airbyte.protocol.models.v0
1249+
.ConfiguredAirbyteCatalog()
1250+
.withStreams(
1251+
java.util.List.of(
1252+
ConfiguredAirbyteStream()
1253+
.withSyncId(42)
1254+
// Generation ID is incremented
1255+
.withGenerationId(44)
1256+
.withMinimumGenerationId(44)
1257+
.withSyncMode(SyncMode.INCREMENTAL)
1258+
.withCursorField(listOf("updated_at"))
1259+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
1260+
.withStream(
1261+
AirbyteStream()
1262+
.withNamespace(streamNamespace)
1263+
.withName(streamName)
1264+
.withJsonSchema(SCHEMA)
1265+
)
1266+
)
1267+
)
1268+
// Interrupted truncate sync
1269+
assertThrows<Exception> {
1270+
runSync(
1271+
catalog2,
1272+
readMessages("dat/sync2_messages.jsonl"),
1273+
streamStatus = AirbyteStreamStatus.INCOMPLETE,
1274+
)
1275+
}
1276+
1277+
// We should still have the exact same records as after the initial sync
1278+
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())
1279+
1280+
val catalog3 =
1281+
io.airbyte.protocol.models.v0
1282+
.ConfiguredAirbyteCatalog()
1283+
.withStreams(
1284+
java.util.List.of(
1285+
ConfiguredAirbyteStream()
1286+
.withSyncId(42)
1287+
// Same generation as the truncate sync, but now with
1288+
// min gen = 0
1289+
.withGenerationId(44)
1290+
.withMinimumGenerationId(0)
1291+
.withSyncMode(SyncMode.INCREMENTAL)
1292+
.withCursorField(listOf("updated_at"))
1293+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
1294+
.withStream(
1295+
AirbyteStream()
1296+
.withNamespace(streamNamespace)
1297+
.withName(streamName)
1298+
.withJsonSchema(SCHEMA)
1299+
)
1300+
)
1301+
)
1302+
1303+
// Third sync
1304+
runSync(catalog3, readMessages("dat/sync2_messages.jsonl"))
1305+
1306+
// We wrote the sync2 records twice, so expect duplicates.
1307+
// But we didn't write the sync1 records twice, so filter those out in a dumb way.
1308+
// Also override the generation ID to the correct value on the sync2 records,
1309+
// but leave the sync1 records with their original generation.
1310+
val expectedRawRecords2 =
1311+
readRecords("dat/sync2_expectedrecords_raw.jsonl").let { baseRecords ->
1312+
val sync2Records =
1313+
baseRecords.subList(expectedRawRecords1.size, baseRecords.size).onEach {
1314+
(it as ObjectNode).put(
1315+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
1316+
44,
1317+
)
1318+
}
1319+
expectedRawRecords1 + sync2Records + sync2Records
1320+
}
1321+
val expectedFinalRecords2 =
1322+
readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").let {
1323+
baseRecords ->
1324+
val sync2Records =
1325+
baseRecords.subList(expectedFinalRecords1.size, baseRecords.size).onEach {
1326+
(it as ObjectNode).put(
1327+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
1328+
44,
1329+
)
1330+
}
1331+
expectedFinalRecords1 + sync2Records + sync2Records
1332+
}
1333+
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
1334+
}
1335+
12071336
open val manyStreamCount = 20
12081337

12091338
@Test

0 commit comments

Comments
 (0)