Skip to content

Commit c03d1d6

Browse files
authored
Destinations CDK: Extract generation ID from catalog (#38127)
1 parent 5b64af0 commit c03d1d6

File tree

8 files changed

+100
-51
lines changed

8 files changed

+100
-51
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177-
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
177+
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
178+
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
179+
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
178180
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
179181
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
180182
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.34.4
1+
version=0.35.0

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt

+23-9
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ constructor(
5555
.substring(0, 3)
5656
val newName = "${originalName}_$hash"
5757
actualStreamConfig =
58-
StreamConfig(
59-
sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace),
60-
originalStreamConfig.syncMode,
61-
originalStreamConfig.destinationSyncMode,
62-
originalStreamConfig.primaryKey,
63-
originalStreamConfig.cursor,
64-
originalStreamConfig.columns,
58+
originalStreamConfig.copy(
59+
id =
60+
sqlGenerator.buildStreamId(
61+
originalNamespace,
62+
newName,
63+
rawNamespace,
64+
),
6565
)
6666
} else {
6767
actualStreamConfig = originalStreamConfig
@@ -112,6 +112,18 @@ constructor(
112112

113113
@VisibleForTesting
114114
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
115+
if (stream.generationId == null) {
116+
stream.generationId = 0
117+
stream.minimumGenerationId = 0
118+
stream.syncId = 0
119+
}
120+
if (
121+
stream.minimumGenerationId != 0.toLong() &&
122+
stream.minimumGenerationId != stream.generationId
123+
) {
124+
throw UnsupportedOperationException("Hybrid refreshes are not yet supported.")
125+
}
126+
115127
val airbyteColumns =
116128
when (
117129
val schema: AirbyteType =
@@ -143,11 +155,13 @@ constructor(
143155

144156
return StreamConfig(
145157
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
146-
stream.syncMode,
147158
stream.destinationSyncMode,
148159
primaryKey,
149160
cursor,
150-
columns
161+
columns,
162+
stream.generationId,
163+
stream.minimumGenerationId,
164+
stream.syncId,
151165
)
152166
}
153167

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@
44
package io.airbyte.integrations.base.destination.typing_deduping
55

66
import io.airbyte.protocol.models.v0.DestinationSyncMode
7-
import io.airbyte.protocol.models.v0.SyncMode
87
import java.util.*
98
import kotlin.collections.LinkedHashMap
109

1110
data class StreamConfig(
1211
val id: StreamId,
13-
val syncMode: SyncMode,
1412
val destinationSyncMode: DestinationSyncMode,
1513
val primaryKey: List<ColumnId>,
1614
val cursor: Optional<ColumnId>,
1715
val columns: LinkedHashMap<ColumnId, AirbyteType>,
16+
val generationId: Long,
17+
val minimumGenerationId: Long,
18+
val syncId: Long,
1819
)

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt

+9-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
1010
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1111
import io.airbyte.protocol.models.v0.DestinationSyncMode
1212
import io.airbyte.protocol.models.v0.SyncMode
13-
import java.util.List
1413
import org.junit.jupiter.api.Assertions
1514
import org.junit.jupiter.api.Assertions.assertAll
1615
import org.junit.jupiter.api.BeforeEach
@@ -74,7 +73,7 @@ internal class CatalogParserTest {
7473
}
7574
val catalog =
7675
ConfiguredAirbyteCatalog()
77-
.withStreams(List.of(stream("a", "foobarfoo"), stream("a", "foofoo")))
76+
.withStreams(listOf(stream("a", "foobarfoo"), stream("a", "foofoo")))
7877

7978
val parsedCatalog = parser.parseCatalog(catalog)
8079

@@ -127,13 +126,13 @@ internal class CatalogParserTest {
127126
128127
""".trimIndent()
129128
)
130-
val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema)))
129+
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))
131130

132131
val parsedCatalog = parser.parseCatalog(catalog)
133-
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
132+
val columnsList = parsedCatalog.streams[0].columns.keys.toList()
134133

135134
assertAll(
136-
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
135+
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
137136
{ Assertions.assertEquals("foofoo", columnsList[0].name) },
138137
{ Assertions.assertEquals("foofoo_1", columnsList[1].name) }
139138
)
@@ -168,10 +167,10 @@ internal class CatalogParserTest {
168167
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))
169168

170169
val parsedCatalog = parser.parseCatalog(catalog)
171-
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
170+
val columnsList = parsedCatalog.streams[0].columns.keys.toList()
172171

173172
assertAll(
174-
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
173+
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
175174
{ Assertions.assertEquals("aVeryLongC", columnsList[0].name) },
176175
{ Assertions.assertEquals("aV36rd", columnsList[1].name) }
177176
)
@@ -200,6 +199,9 @@ internal class CatalogParserTest {
200199
)
201200
.withSyncMode(SyncMode.INCREMENTAL)
202201
.withDestinationSyncMode(DestinationSyncMode.APPEND)
202+
.withGenerationId(0)
203+
.withMinimumGenerationId(0)
204+
.withSyncId(0)
203205
}
204206
}
205207
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -913,11 +913,13 @@ class DefaultTyperDeduperTest {
913913
"overwrite_ns",
914914
"overwrite_stream"
915915
),
916-
mock(),
917916
DestinationSyncMode.OVERWRITE,
918917
mock(),
919918
mock(),
920-
mock()
919+
mock(),
920+
0,
921+
0,
922+
0,
921923
)
922924
private val APPEND_STREAM_CONFIG =
923925
StreamConfig(
@@ -929,11 +931,13 @@ class DefaultTyperDeduperTest {
929931
"append_ns",
930932
"append_stream"
931933
),
932-
mock(),
933934
DestinationSyncMode.APPEND,
934935
mock(),
935936
mock(),
936-
mock()
937+
mock(),
938+
0,
939+
0,
940+
0,
937941
)
938942
private val DEDUPE_STREAM_CONFIG =
939943
StreamConfig(
@@ -945,11 +949,13 @@ class DefaultTyperDeduperTest {
945949
"dedup_ns",
946950
"dedup_stream"
947951
),
948-
mock(),
949952
DestinationSyncMode.APPEND_DEDUP,
950953
mock(),
951954
mock(),
952-
mock()
955+
mock(),
956+
0,
957+
0,
958+
0,
953959
)
954960
}
955961
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest {
7777
migrator: BaseDestinationV1V2Migrator<*>,
7878
expected: Boolean
7979
) {
80-
val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock())
80+
val config = StreamConfig(STREAM_ID, destinationSyncMode, mock(), mock(), mock(), 0, 0, 0)
8181
val actual = migrator.shouldMigrate(config)
8282
Assertions.assertEquals(expected, actual)
8383
}
@@ -88,11 +88,13 @@ class DestinationV1V2MigratorTest {
8888
val config =
8989
StreamConfig(
9090
STREAM_ID,
91-
mock(),
9291
DestinationSyncMode.APPEND_DEDUP,
9392
mock(),
9493
mock(),
95-
mock()
94+
mock(),
95+
0,
96+
0,
97+
0,
9698
)
9799
val migrator = makeMockMigrator(true, true, false, false, false)
98100
val exception =
@@ -112,11 +114,13 @@ class DestinationV1V2MigratorTest {
112114
val stream =
113115
StreamConfig(
114116
STREAM_ID,
115-
mock(),
116117
DestinationSyncMode.APPEND_DEDUP,
117118
mock(),
118119
mock(),
119-
mock()
120+
mock(),
121+
0,
122+
0,
123+
0,
120124
)
121125
val handler = Mockito.mock(DestinationHandler::class.java)
122126
val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table")

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt

+40-20
Original file line numberDiff line numberDiff line change
@@ -222,39 +222,47 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
222222
incrementalDedupStream =
223223
StreamConfig(
224224
streamId,
225-
SyncMode.INCREMENTAL,
226225
DestinationSyncMode.APPEND_DEDUP,
227226
primaryKey,
228227
Optional.of(cursor),
229-
COLUMNS
228+
COLUMNS,
229+
0,
230+
0,
231+
0,
230232
)
231233
incrementalAppendStream =
232234
StreamConfig(
233235
streamId,
234-
SyncMode.INCREMENTAL,
235236
DestinationSyncMode.APPEND,
236237
primaryKey,
237238
Optional.of(cursor),
238-
COLUMNS
239+
COLUMNS,
240+
0,
241+
0,
242+
0,
239243
)
240244

241245
cdcIncrementalDedupStream =
242246
StreamConfig(
243247
streamId,
244-
SyncMode.INCREMENTAL,
245248
DestinationSyncMode.APPEND_DEDUP,
246249
primaryKey,
247250
Optional.of(cursor),
248-
cdcColumns
251+
cdcColumns,
252+
0,
253+
0,
254+
0,
249255
)
250256
cdcIncrementalAppendStream =
251257
StreamConfig(
252258
streamId,
253-
SyncMode.INCREMENTAL,
254259
DestinationSyncMode.APPEND,
255260
primaryKey,
256261
Optional.of(cursor),
257-
cdcColumns
262+
cdcColumns,
263+
0,
264+
0,
265+
0,
258266
)
259267

260268
LOGGER.info("Running with namespace {}", namespace)
@@ -353,11 +361,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
353361
val stream =
354362
StreamConfig(
355363
streamId,
356-
SyncMode.INCREMENTAL,
357364
DestinationSyncMode.APPEND_DEDUP,
358365
incrementalDedupStream.primaryKey,
359366
incrementalDedupStream.cursor,
360-
incrementalDedupStream.columns
367+
incrementalDedupStream.columns,
368+
0,
369+
0,
370+
0,
361371
)
362372

363373
createRawTable(streamId)
@@ -962,11 +972,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
962972
val streamConfig =
963973
StreamConfig(
964974
streamId,
965-
SyncMode.INCREMENTAL,
966975
DestinationSyncMode.APPEND_DEDUP,
967976
primaryKey,
968977
Optional.empty(),
969-
COLUMNS
978+
COLUMNS,
979+
0,
980+
0,
981+
0,
970982
)
971983
createRawTable(streamId)
972984
createFinalTable(streamConfig, "")
@@ -1369,7 +1381,6 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
13691381
val stream =
13701382
StreamConfig(
13711383
streamId,
1372-
SyncMode.INCREMENTAL,
13731384
DestinationSyncMode.APPEND_DEDUP,
13741385
primaryKey,
13751386
Optional.of(cursor),
@@ -1387,7 +1398,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
13871398
generator.buildColumnId("includes$\$doubledollar") to
13881399
AirbyteProtocolType.STRING,
13891400
generator.buildColumnId("endswithbackslash\\") to AirbyteProtocolType.STRING
1390-
)
1401+
),
1402+
0,
1403+
0,
1404+
0,
13911405
)
13921406

13931407
val createTable = generator.createTable(stream, "", false)
@@ -1442,11 +1456,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
14421456
val stream =
14431457
StreamConfig(
14441458
modifiedStreamId,
1445-
SyncMode.INCREMENTAL,
14461459
DestinationSyncMode.APPEND_DEDUP,
14471460
java.util.List.of(columnId),
14481461
Optional.of(columnId),
1449-
linkedMapOf(columnId to AirbyteProtocolType.STRING)
1462+
linkedMapOf(columnId to AirbyteProtocolType.STRING),
1463+
0,
1464+
0,
1465+
0,
14501466
)
14511467

14521468
val createTable = generator.createTable(stream, "", false)
@@ -1475,14 +1491,16 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
14751491
val stream =
14761492
StreamConfig(
14771493
streamId,
1478-
SyncMode.INCREMENTAL,
14791494
DestinationSyncMode.APPEND,
14801495
emptyList(),
14811496
Optional.empty(),
14821497
linkedMapOf(
14831498
generator.buildColumnId("current_date") to AirbyteProtocolType.STRING,
14841499
generator.buildColumnId("join") to AirbyteProtocolType.STRING
1485-
)
1500+
),
1501+
0,
1502+
0,
1503+
0,
14861504
)
14871505

14881506
val createTable = generator.createTable(stream, "", false)
@@ -1523,11 +1541,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
15231541
val stream =
15241542
StreamConfig(
15251543
streamId,
1526-
SyncMode.INCREMENTAL,
15271544
DestinationSyncMode.APPEND,
15281545
emptyList<ColumnId>(),
15291546
Optional.empty(),
1530-
LinkedHashMap()
1547+
LinkedHashMap(),
1548+
0,
1549+
0,
1550+
0,
15311551
)
15321552

15331553
val createTable = generator.createTable(stream, "", false)

0 commit comments

Comments
 (0)