@@ -1117,9 +1117,7 @@ abstract class BaseTypingDedupingTest {
1117
1117
val expectedRawRecords0 = readRecords(" dat/sync2_expectedrecords_raw.jsonl" )
1118
1118
val expectedFinalRecords0 =
1119
1119
readRecords(" dat/sync2_expectedrecords_fullrefresh_append_final.jsonl" )
1120
- (expectedFinalRecords0 + expectedRawRecords0).forEach {
1121
- (it as ObjectNode ).put(JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID , 41 )
1122
- }
1120
+ fixGenerationId(expectedRawRecords0, expectedFinalRecords0, 41 )
1123
1121
verifySyncResult(expectedRawRecords0, expectedFinalRecords0, disableFinalTableComparison())
1124
1122
1125
1123
val catalog =
@@ -1204,6 +1202,138 @@ abstract class BaseTypingDedupingTest {
1204
1202
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
1205
1203
}
1206
1204
1205
+ /* *
1206
+ * Emulates this sequence of events:
1207
+ * 1. User runs a normal incremental sync
1208
+ * 2. User initiates a truncate refresh, but it fails.
1209
+ * 3. User cancels the truncate refresh, and initiates a normal incremental sync.
1210
+ *
1211
+ * In particular, we must retain all records from both the first sync, _and_ the truncate sync's
1212
+ * temporary raw table.
1213
+ */
1214
+ @Test
1215
+ @Throws(Exception ::class )
1216
+ open fun resumeAfterCancelledTruncate () {
1217
+ val catalog1 =
1218
+ io.airbyte.protocol.models.v0
1219
+ .ConfiguredAirbyteCatalog ()
1220
+ .withStreams(
1221
+ java.util.List .of(
1222
+ ConfiguredAirbyteStream ()
1223
+ .withSyncId(42 )
1224
+ .withGenerationId(43 )
1225
+ .withMinimumGenerationId(0 )
1226
+ .withSyncMode(SyncMode .INCREMENTAL )
1227
+ .withCursorField(listOf (" updated_at" ))
1228
+ .withDestinationSyncMode(DestinationSyncMode .APPEND )
1229
+ .withStream(
1230
+ AirbyteStream ()
1231
+ .withNamespace(streamNamespace)
1232
+ .withName(streamName)
1233
+ .withJsonSchema(SCHEMA )
1234
+ )
1235
+ )
1236
+ )
1237
+
1238
+ // Normal sync
1239
+ runSync(catalog1, readMessages(" dat/sync1_messages.jsonl" ))
1240
+
1241
+ val expectedRawRecords1 = readRecords(" dat/sync1_expectedrecords_raw.jsonl" )
1242
+ val expectedFinalRecords1 = readRecords(" dat/sync1_expectedrecords_nondedup_final.jsonl" )
1243
+ verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())
1244
+
1245
+ val catalog2 =
1246
+ io.airbyte.protocol.models.v0
1247
+ .ConfiguredAirbyteCatalog ()
1248
+ .withStreams(
1249
+ java.util.List .of(
1250
+ ConfiguredAirbyteStream ()
1251
+ .withSyncId(42 )
1252
+ // Generation ID is incremented
1253
+ .withGenerationId(44 )
1254
+ .withMinimumGenerationId(44 )
1255
+ .withSyncMode(SyncMode .INCREMENTAL )
1256
+ .withCursorField(listOf (" updated_at" ))
1257
+ .withDestinationSyncMode(DestinationSyncMode .APPEND )
1258
+ .withStream(
1259
+ AirbyteStream ()
1260
+ .withNamespace(streamNamespace)
1261
+ .withName(streamName)
1262
+ .withJsonSchema(SCHEMA )
1263
+ )
1264
+ )
1265
+ )
1266
+ // Interrupted truncate sync
1267
+ assertThrows<Exception > {
1268
+ runSync(
1269
+ catalog2,
1270
+ readMessages(" dat/sync2_messages.jsonl" ),
1271
+ streamStatus = AirbyteStreamStatus .INCOMPLETE ,
1272
+ )
1273
+ }
1274
+
1275
+ // We should still have the exact same records as after the initial sync
1276
+ verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())
1277
+
1278
+ val catalog3 =
1279
+ io.airbyte.protocol.models.v0
1280
+ .ConfiguredAirbyteCatalog ()
1281
+ .withStreams(
1282
+ java.util.List .of(
1283
+ ConfiguredAirbyteStream ()
1284
+ .withSyncId(42 )
1285
+ // Same generation as the truncate sync, but now with
1286
+ // min gen = 0
1287
+ .withGenerationId(44 )
1288
+ .withMinimumGenerationId(0 )
1289
+ .withSyncMode(SyncMode .INCREMENTAL )
1290
+ .withCursorField(listOf (" updated_at" ))
1291
+ .withDestinationSyncMode(DestinationSyncMode .APPEND )
1292
+ .withStream(
1293
+ AirbyteStream ()
1294
+ .withNamespace(streamNamespace)
1295
+ .withName(streamName)
1296
+ .withJsonSchema(SCHEMA )
1297
+ )
1298
+ )
1299
+ )
1300
+
1301
+ // Third sync
1302
+ runSync(catalog3, readMessages(" dat/sync2_messages.jsonl" ))
1303
+
1304
+ // We wrote the sync2 records twice, so expect duplicates.
1305
+ // But we didn't write the sync1 records twice, so filter those out in a dumb way.
1306
+ // Also override the generation ID to the correct value on the sync2 records,
1307
+ // but leave the sync1 records with their original generation.
1308
+ val expectedRawRecords2 =
1309
+ readRecords(" dat/sync2_expectedrecords_raw.jsonl" ).let { baseRecords ->
1310
+ val sync2Records =
1311
+ baseRecords.subList(expectedRawRecords1.size, baseRecords.size).onEach {
1312
+ (it as ObjectNode ).put(
1313
+ JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID ,
1314
+ 44 ,
1315
+ )
1316
+ }
1317
+ expectedRawRecords1 + sync2Records + sync2Records
1318
+ }
1319
+ val expectedFinalRecords2 =
1320
+ readRecords(" dat/sync2_expectedrecords_fullrefresh_append_final.jsonl" ).let {
1321
+ baseRecords ->
1322
+ val sync2Records =
1323
+ baseRecords.subList(expectedFinalRecords1.size, baseRecords.size).onEach {
1324
+ (it as ObjectNode ).put(
1325
+ finalMetadataColumnNames.getOrDefault(
1326
+ JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID ,
1327
+ JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID
1328
+ ),
1329
+ 44 ,
1330
+ )
1331
+ }
1332
+ expectedFinalRecords1 + sync2Records + sync2Records
1333
+ }
1334
+ verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
1335
+ }
1336
+
1207
1337
open val manyStreamCount = 20
1208
1338
1209
1339
@Test
0 commit comments