Skip to content

Commit a16cc58

Browse files
author
Marius Posta
authored
bulk-cdk: DiscoverOperation supports isResumable (#43406)
1 parent 55c94f9 commit a16cc58

File tree

14 files changed

+26
-10
lines changed

14 files changed

+26
-10
lines changed

airbyte-cdk/bulk/core/base/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dependencies {
55
api 'com.fasterxml.jackson.core:jackson-databind'
66
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
77
api 'com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39'
8-
api('io.airbyte.airbyte-protocol:protocol-models:0.9.0') {
8+
api('io.airbyte.airbyte-protocol:protocol-models:0.12.2') {
99
exclude group: 'com.google.guava', module: 'guava'
1010
exclude group: 'com.google.api-client'
1111
exclude group: 'org.apache.logging.log4j'

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/operation/DiscoverOperation.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ class DiscoverOperation(
3939
}
4040
continue
4141
}
42-
val primaryKeys: List<List<String>> =
43-
metadataQuerier.primaryKeys(name, namespace)
44-
val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKeys)
42+
val primaryKey: List<List<String>> = metadataQuerier.primaryKey(name, namespace)
43+
val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKey)
4544
airbyteStreams.add(toAirbyteStream(discoveredStream))
4645
}
4746
}
@@ -68,6 +67,7 @@ class DiscoverOperation(
6867
airbyteStream.withSourceDefinedPrimaryKey(
6968
if (isValidPK) discoveredStream.primaryKeyColumnIDs else listOf(),
7069
)
70+
airbyteStream.isResumable = airbyteStream.sourceDefinedPrimaryKey.isNotEmpty()
7171
if (config.global) {
7272
// There is a global feed of incremental records, like CDC.
7373
airbyteStreamDecorator.decorateGlobal(airbyteStream)

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/AirbyteStreamDecorator.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ interface AirbyteStreamDecorator {
3434
* message.
3535
*
3636
* This method does not determine (1), of course, because the source keys are defined in the
37-
* source database itself and are retrieved via [MetadataQuerier.primaryKeys]. Instead, this
37+
* source database itself and are retrieved via [MetadataQuerier.primaryKey]. Instead, this
3838
* method determines (2) based on the type information of the field, typically the [FieldType]
3939
* objects. For instance if the [Field.type] does not map to a [LosslessFieldType] then the
4040
* field can't reliably round-trip checkpoint values during a resumable initial sync.

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/MetadataQuerier.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ interface MetadataQuerier : AutoCloseable {
1919
streamNamespace: String?,
2020
): List<Field>
2121

22-
/** Queries the information_schema for all primary keys for the given table. */
23-
fun primaryKeys(
22+
/** Queries the information_schema for any primary key on the given table. */
23+
fun primaryKey(
2424
streamName: String,
2525
streamNamespace: String?,
2626
): List<List<String>>

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerierTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class JdbcMetadataQuerierTest {
7272
val tableName = (mdq as JdbcMetadataQuerier).findTableName("KV", "PUBLIC")
7373
Assertions.assertNotNull(tableName)
7474
Assertions.assertEquals(expectedColumnMetadata, mdq.columnMetadata(tableName!!))
75-
Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKeys("KV", "PUBLIC"))
75+
Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKey("KV", "PUBLIC"))
7676
}
7777
}
7878
}

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/source/FakeSourceDiscoverTest.kt

+4
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ class FakeSourceDiscoverTest {
3434
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
3535
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
3636
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
37+
.withIsResumable(true)
3738
val kv =
3839
AirbyteStream()
3940
.withName("KV")
4041
.withNamespace("PUBLIC")
4142
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
4243
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
4344
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
45+
.withIsResumable(true)
4446
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
4547
discoverOperation.execute()
4648
Assertions.assertEquals(listOf(expected), outputConsumer.catalogs())
@@ -59,13 +61,15 @@ class FakeSourceDiscoverTest {
5961
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
6062
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
6163
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
64+
.withIsResumable(true)
6265
val kv =
6366
AirbyteStream()
6467
.withName("KV")
6568
.withNamespace("PUBLIC")
6669
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
6770
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
6871
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
72+
.withIsResumable(true)
6973
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
7074
discoverOperation.execute()
7175
Assertions.assertEquals(listOf(expected), outputConsumer.catalogs())

airbyte-cdk/bulk/core/extract/src/test/resources/read/cdc-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"supported_sync_modes": ["full_refresh", "incremental"],
2323
"default_cursor_field": ["ID", "TS"],
2424
"source_defined_primary_key": [["ID"]],
25+
"is_resumable": true,
2526
"namespace": "PUBLIC"
2627
},
2728
"sync_mode": "full_refresh",
@@ -47,6 +48,7 @@
4748
"supported_sync_modes": ["full_refresh", "incremental"],
4849
"default_cursor_field": ["K"],
4950
"source_defined_primary_key": [["K"]],
51+
"is_resumable": true,
5052
"namespace": "PUBLIC"
5153
},
5254
"sync_mode": "incremental",

airbyte-cdk/bulk/core/extract/src/test/resources/read/cursor-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"supported_sync_modes": ["full_refresh", "incremental"],
2323
"default_cursor_field": [],
2424
"source_defined_primary_key": [["ID"]],
25+
"is_resumable": true,
2526
"namespace": "PUBLIC"
2627
},
2728
"sync_mode": "incremental",
@@ -47,6 +48,7 @@
4748
"supported_sync_modes": ["full_refresh", "incremental"],
4849
"default_cursor_field": [],
4950
"source_defined_primary_key": [["K"]],
51+
"is_resumable": true,
5052
"namespace": "PUBLIC"
5153
},
5254
"sync_mode": "full_refresh",

airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cdc-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"supported_sync_modes": ["full_refresh", "incremental"],
2424
"default_cursor_field": ["ID", "TS"],
2525
"source_defined_primary_key": [["ID"]],
26+
"is_resumable": true,
2627
"namespace": "PUBLIC"
2728
},
2829
"sync_mode": "full_refresh",
@@ -48,6 +49,7 @@
4849
"supported_sync_modes": ["full_refresh", "incremental"],
4950
"default_cursor_field": ["K"],
5051
"source_defined_primary_key": [["K"]],
52+
"is_resumable": true,
5153
"namespace": "PUBLIC"
5254
},
5355
"sync_mode": "incremental",

airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cursor-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"supported_sync_modes": ["full_refresh", "incremental"],
2424
"default_cursor_field": [],
2525
"source_defined_primary_key": [["ID"]],
26+
"is_resumable": true,
2627
"namespace": "PUBLIC"
2728
},
2829
"sync_mode": "incremental",
@@ -48,6 +49,7 @@
4849
"supported_sync_modes": ["full_refresh", "incremental"],
4950
"default_cursor_field": [],
5051
"source_defined_primary_key": [["K"]],
52+
"is_resumable": true,
5153
"namespace": "PUBLIC"
5254
},
5355
"sync_mode": "full_refresh",

airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cdc-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"source_defined_cursor": true,
4141
"default_cursor_field": ["_ab_cdc_lsn"],
4242
"source_defined_primary_key": [["ID"]],
43+
"is_resumable": true,
4344
"namespace": "PUBLIC"
4445
},
4546
{
@@ -77,6 +78,7 @@
7778
"source_defined_cursor": true,
7879
"default_cursor_field": ["_ab_cdc_lsn"],
7980
"source_defined_primary_key": [["K"]],
81+
"is_resumable": true,
8082
"namespace": "PUBLIC"
8183
}
8284
]

airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cursor-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"supported_sync_modes": ["full_refresh", "incremental"],
2727
"default_cursor_field": [],
2828
"source_defined_primary_key": [["ID"]],
29+
"is_resumable": true,
2930
"namespace": "PUBLIC"
3031
},
3132
{
@@ -49,6 +50,7 @@
4950
"supported_sync_modes": ["full_refresh", "incremental"],
5051
"default_cursor_field": [],
5152
"source_defined_primary_key": [["K"]],
53+
"is_resumable": true,
5254
"namespace": "PUBLIC"
5355
}
5456
]

airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class ResourceDrivenMetadataQuerierFactory(
6666
?: throw SQLException("query failed", "tbl")
6767
}
6868

69-
override fun primaryKeys(
69+
override fun primaryKey(
7070
streamName: String,
7171
streamNamespace: String?,
7272
): List<List<String>> {

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerier.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ class JdbcMetadataQuerier(
272272

273273
val memoizedPrimaryKeys = mutableMapOf<TableName, List<List<String>>>()
274274

275-
override fun primaryKeys(
275+
override fun primaryKey(
276276
streamName: String,
277277
streamNamespace: String?,
278278
): List<List<String>> {

0 commit comments

Comments
 (0)