Skip to content

Commit c74bce4

Browse files
author
Marius Posta
authored
bulk-cdk-core-extract: fix catalog schema bug (#43369)
1 parent 7cab6a2 commit c74bce4

File tree

10 files changed

+32
-44
lines changed

10 files changed

+32
-44
lines changed

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/operation/DiscoverOperation.kt

+8-5
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,15 @@ class DiscoverOperation(
5959
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
6060
},
6161
)
62-
val pkColumnIDs: List<List<String>> =
63-
discoveredStream.primaryKeyColumnIDs.filter { pk: List<String> ->
64-
// Only keep PKs whose values can be round-tripped.
65-
pk.all { airbyteStreamDecorator.isPossiblePrimaryKeyElement(allColumnsByID[it]!!) }
62+
val isValidPK: Boolean =
63+
discoveredStream.primaryKeyColumnIDs.all { idComponents: List<String> ->
64+
val id: String = idComponents.joinToString(separator = ".")
65+
val field: Field? = allColumnsByID[id]
66+
field != null && airbyteStreamDecorator.isPossiblePrimaryKeyElement(field)
6667
}
67-
airbyteStream.withSourceDefinedPrimaryKey(pkColumnIDs)
68+
airbyteStream.withSourceDefinedPrimaryKey(
69+
if (isValidPK) discoveredStream.primaryKeyColumnIDs else listOf(),
70+
)
6871
if (config.global) {
6972
// There is a global feed of incremental records, like CDC.
7073
airbyteStreamDecorator.decorateGlobal(airbyteStream)

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Feed.kt

-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ data class Stream(
3434
val name: String,
3535
val namespace: String?,
3636
val fields: List<Field>,
37-
val primaryKeyCandidates: List<List<Field>>,
3837
val configuredSyncMode: SyncMode,
3938
val configuredPrimaryKey: List<Field>?,
4039
val configuredCursor: FieldOrMetaField?,

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt

+14-8
Original file line numberDiff line numberDiff line change
@@ -156,27 +156,34 @@ class StateManagerFactory(
156156
return null
157157
}
158158

159-
fun pkOrNull(pkColumnIDs: List<String>): List<Field>? {
159+
fun pkOrNull(pkColumnIDComponents: List<List<String>>): List<Field>? {
160+
if (pkColumnIDComponents.isEmpty()) {
161+
return null
162+
}
163+
val pkColumnIDs: List<String> =
164+
pkColumnIDComponents.map { it.joinToString(separator = ".") }
160165
val pk: List<Field> = pkColumnIDs.mapNotNull(::dataColumnOrNull)
161-
if (pk.isEmpty() || pk.size < pkColumnIDs.size) {
166+
if (pk.size < pkColumnIDComponents.size) {
162167
handler.accept(InvalidPrimaryKey(name, namespace, pkColumnIDs))
163168
return null
164169
}
165170
return pk
166171
}
167172

168-
fun cursorOrNull(cursorColumnID: String): FieldOrMetaField? {
173+
fun cursorOrNull(cursorColumnIDComponents: List<String>): FieldOrMetaField? {
174+
if (cursorColumnIDComponents.isEmpty()) {
175+
return null
176+
}
177+
val cursorColumnID: String = cursorColumnIDComponents.joinToString(separator = ".")
169178
if (cursorColumnID == CommonMetaField.CDC_LSN.id) {
170179
return CommonMetaField.CDC_LSN
171180
}
172181
return dataColumnOrNull(cursorColumnID)
173182
}
174-
val primaryKeyCandidates: List<List<Field>> =
175-
stream.sourceDefinedPrimaryKey.mapNotNull(::pkOrNull)
176183
val configuredPrimaryKey: List<Field>? =
177-
configuredStream.primaryKey?.asSequence()?.mapNotNull(::pkOrNull)?.firstOrNull()
184+
configuredStream.primaryKey?.asSequence()?.let { pkOrNull(it.toList()) }
178185
val configuredCursor: FieldOrMetaField? =
179-
configuredStream.cursorField?.asSequence()?.mapNotNull(::cursorOrNull)?.firstOrNull()
186+
configuredStream.cursorField?.asSequence()?.let { cursorOrNull(it.toList()) }
180187
val configuredSyncMode: SyncMode =
181188
when (configuredStream.syncMode) {
182189
SyncMode.INCREMENTAL ->
@@ -192,7 +199,6 @@ class StateManagerFactory(
192199
name,
193200
namespace,
194201
streamFields,
195-
primaryKeyCandidates,
196202
configuredSyncMode,
197203
configuredPrimaryKey,
198204
configuredCursor,

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/RootReaderIntegrationTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ data class TestCase(
214214
name = name,
215215
namespace = "test",
216216
fields = listOf(),
217-
primaryKeyCandidates = listOf(),
218217
configuredSyncMode = SyncMode.FULL_REFRESH,
219218
configuredPrimaryKey = null,
220219
configuredCursor = null,

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt

+2-8
Original file line numberDiff line numberDiff line change
@@ -204,18 +204,12 @@ class StateManagerGlobalStatesTest {
204204
val kv: Stream = global.streams.first()
205205
Assertions.assertEquals("KV", kv.name)
206206
Assertions.assertEquals(listOf("V", "K"), kv.fields.map { it.id })
207-
Assertions.assertEquals(
208-
listOf(listOf("K")),
209-
kv.primaryKeyCandidates.map { col -> col.map { it.id } },
210-
)
207+
Assertions.assertEquals(listOf("K"), kv.configuredPrimaryKey?.map { it.id })
211208
Assertions.assertEquals(SyncMode.INCREMENTAL, kv.configuredSyncMode)
212209
val events: Stream = streams.filter { it.namePair != kv.namePair }.first()
213210
Assertions.assertEquals("EVENTS", events.name)
214211
Assertions.assertEquals(listOf("MSG", "ID", "TS"), events.fields.map { it.id })
215-
Assertions.assertEquals(
216-
listOf(listOf("ID")),
217-
events.primaryKeyCandidates.map { col -> col.map { it.id } },
218-
)
212+
Assertions.assertEquals(listOf("ID"), events.configuredPrimaryKey?.map { it.id })
219213
Assertions.assertEquals(SyncMode.FULL_REFRESH, events.configuredSyncMode)
220214
return Streams(global, kv, events)
221215
}

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerStreamStatesTest.kt

-4
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,6 @@ class StateManagerStreamStatesTest {
185185
val eventsStream: Stream = stateManager.feeds.mapNotNull { it as? Stream }.first()
186186
Assertions.assertEquals("EVENTS", eventsStream.name)
187187
Assertions.assertEquals(listOf("MSG", "ID", "TS"), eventsStream.fields.map { it.id })
188-
Assertions.assertEquals(
189-
listOf(listOf("ID")),
190-
eventsStream.primaryKeyCandidates.map { col -> col.map { it.id } },
191-
)
192188
Assertions.assertEquals(expectedSyncMode, eventsStream.configuredSyncMode)
193189
Assertions.assertEquals(
194190
expectedPrimaryKey,

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/streams/StreamPartitionsCreatorUtilsTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ class StreamPartitionsCreatorUtilsTest {
6060
name = "kv",
6161
namespace = "public",
6262
fields = listOf(k, v),
63-
primaryKeyCandidates = listOf(listOf(k)),
6463
configuredSyncMode = SyncMode.FULL_REFRESH,
6564
configuredPrimaryKey = listOf(k),
6665
configuredCursor = null,

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerier.kt

+4-7
Original file line numberDiff line numberDiff line change
@@ -299,13 +299,10 @@ class JdbcMetadataQuerier(
299299
} catch (e: Exception) {
300300
throw RuntimeException("Primary key discovery query failed: ${e.message}", e)
301301
}
302-
return results
303-
.groupBy { it.name }
304-
.values
305-
.map { rowsByPK: List<PrimaryKeyRow> ->
306-
rowsByPK.sortedBy { it.ordinal }.map { it.columnName }
307-
}
308-
.also { memoizedPrimaryKeys[table] = it }
302+
val rows: List<PrimaryKeyRow> = results.groupBy { it.name }.values.firstOrNull() ?: listOf()
303+
val pk: List<List<String>> = rows.sortedBy { it.ordinal }.map { listOf(it.columnName) }
304+
memoizedPrimaryKeys[table] = pk
305+
return pk
309306
}
310307

311308
private data class PrimaryKeyRow(

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/stream/CheckpointStreamState.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,8 @@ private fun StreamStateJsonValue.checkpoint(ctx: StreamReadContext): CheckpointS
8282
if (primaryKey.isEmpty()) {
8383
return@run mapOf()
8484
}
85-
val keys: List<Field>? =
86-
ctx.stream.primaryKeyCandidates.find { pk: List<Field> ->
87-
pk.map { it.id }.toSet() == primaryKey.keys
88-
}
89-
if (keys == null) {
85+
val pk: List<Field> = ctx.stream.configuredPrimaryKey ?: listOf()
86+
if (primaryKey.keys != pk.map { it.id }.toSet()) {
9087
ctx.handler.accept(
9188
InvalidPrimaryKey(
9289
ctx.stream.name,
@@ -96,7 +93,7 @@ private fun StreamStateJsonValue.checkpoint(ctx: StreamReadContext): CheckpointS
9693
)
9794
return null
9895
}
99-
keys.associateWith { primaryKey[it.id]!! }
96+
pk.associateWith { primaryKey[it.id]!! }
10097
}
10198
val cursorPair: Pair<Field, JsonNode>? = run {
10299
if (cursors.isEmpty()) {

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/stream/StreamPartitionsCreator.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,7 @@ fun CheckpointStreamState?.streamPartitionsCreatorInput(
162162
ctx: StreamReadContext,
163163
): StreamPartitionsCreator.Input {
164164
if (this == null) {
165-
val pkChosenFromCatalog: List<Field> =
166-
ctx.stream.configuredPrimaryKey
167-
?: ctx.stream.primaryKeyCandidates.firstOrNull() ?: listOf()
165+
val pkChosenFromCatalog: List<Field> = ctx.stream.configuredPrimaryKey ?: listOf()
168166
if (ctx.stream.configuredSyncMode == SyncMode.FULL_REFRESH || ctx.configuration.global) {
169167
return StreamPartitionsCreator.SnapshotColdStart(pkChosenFromCatalog)
170168
}

0 commit comments

Comments
 (0)