Skip to content

Commit 371f7fa

Browse files
author
Marius Posta
committed
bulk-cdk-core-extract: replace AirbyteStreamDecorator with AirbyteStreamFactory
1 parent 53e69bb commit 371f7fa

File tree

11 files changed

+163
-155
lines changed

11 files changed

+163
-155
lines changed

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/AirbyteStreamDecorator.kt

-52
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.cdk.discover
3+
4+
import io.airbyte.protocol.models.Field as AirbyteField
5+
import io.airbyte.protocol.models.v0.AirbyteStream
6+
import io.airbyte.protocol.models.v0.CatalogHelpers
7+
8+
/** Stateless object for building an [AirbyteStream] during DISCOVER. */
9+
interface AirbyteStreamFactory {
10+
/** Connector-specific [AirbyteStream] creation logic for GLOBAL-state streams. */
11+
fun createGlobal(discoveredStream: DiscoveredStream): AirbyteStream
12+
13+
/** Connector-specific [AirbyteStream] creation logic for STREAM-state streams. */
14+
fun createNonGlobal(discoveredStream: DiscoveredStream): AirbyteStream
15+
16+
companion object {
17+
18+
fun createAirbyteStream(discoveredStream: DiscoveredStream): AirbyteStream =
19+
CatalogHelpers.createAirbyteStream(
20+
discoveredStream.name,
21+
discoveredStream.namespace,
22+
discoveredStream.columns.map {
23+
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
24+
},
25+
)
26+
}
27+
}

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt

+8-44
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ package io.airbyte.cdk.discover
44
import io.airbyte.cdk.Operation
55
import io.airbyte.cdk.command.SourceConfiguration
66
import io.airbyte.cdk.output.OutputConsumer
7-
import io.airbyte.protocol.models.Field as AirbyteField
87
import io.airbyte.protocol.models.v0.AirbyteCatalog
98
import io.airbyte.protocol.models.v0.AirbyteStream
10-
import io.airbyte.protocol.models.v0.CatalogHelpers
119
import io.github.oshai.kotlinlogging.KotlinLogging
1210
import io.micronaut.context.annotation.Requires
1311
import jakarta.inject.Singleton
@@ -18,7 +16,7 @@ import jakarta.inject.Singleton
1816
class DiscoverOperation(
1917
val config: SourceConfiguration,
2018
val metadataQuerierFactory: MetadataQuerier.Factory<SourceConfiguration>,
21-
val airbyteStreamDecorator: AirbyteStreamDecorator,
19+
val airbyteStreamFactory: AirbyteStreamFactory,
2220
val outputConsumer: OutputConsumer,
2321
) : Operation {
2422
private val log = KotlinLogging.logger {}
@@ -39,50 +37,16 @@ class DiscoverOperation(
3937
}
4038
val primaryKey: List<List<String>> = metadataQuerier.primaryKey(name, namespace)
4139
val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKey)
42-
airbyteStreams.add(toAirbyteStream(discoveredStream))
40+
val airbyteStream: AirbyteStream =
41+
if (config.global) {
42+
airbyteStreamFactory.createGlobal(discoveredStream)
43+
} else {
44+
airbyteStreamFactory.createNonGlobal(discoveredStream)
45+
}
46+
airbyteStreams.add(airbyteStream)
4347
}
4448
}
4549
}
4650
outputConsumer.accept(AirbyteCatalog().withStreams(airbyteStreams))
4751
}
48-
49-
fun toAirbyteStream(discoveredStream: DiscoveredStream): AirbyteStream {
50-
val allColumnsByID: Map<String, Field> = discoveredStream.columns.associateBy { it.id }
51-
val airbyteStream: AirbyteStream =
52-
CatalogHelpers.createAirbyteStream(
53-
discoveredStream.name,
54-
discoveredStream.namespace,
55-
discoveredStream.columns.map {
56-
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
57-
},
58-
)
59-
val isValidPK: Boolean =
60-
discoveredStream.primaryKeyColumnIDs.all { idComponents: List<String> ->
61-
val id: String = idComponents.joinToString(separator = ".")
62-
val field: Field? = allColumnsByID[id]
63-
field != null && airbyteStreamDecorator.isPossiblePrimaryKeyElement(field)
64-
}
65-
airbyteStream.withSourceDefinedPrimaryKey(
66-
if (isValidPK) discoveredStream.primaryKeyColumnIDs else listOf(),
67-
)
68-
airbyteStream.isResumable = airbyteStream.sourceDefinedPrimaryKey.isNotEmpty()
69-
if (config.global) {
70-
// There is a global feed of incremental records, like CDC.
71-
airbyteStreamDecorator.decorateGlobal(airbyteStream)
72-
} else if (discoveredStream.columns.any { airbyteStreamDecorator.isPossibleCursor(it) }) {
73-
// There is one field whose values can be round-tripped and aggregated by MAX.
74-
airbyteStreamDecorator.decorateNonGlobal(airbyteStream)
75-
} else {
76-
// There is no such field.
77-
airbyteStreamDecorator.decorateNonGlobalNoCursor(airbyteStream)
78-
}
79-
return airbyteStream
80-
}
81-
82-
data class DiscoveredStream(
83-
val name: String,
84-
val namespace: String?,
85-
val columns: List<Field>,
86-
val primaryKeyColumnIDs: List<List<String>>,
87-
)
8852
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.discover
6+
7+
data class DiscoveredStream(
8+
val name: String,
9+
val namespace: String?,
10+
val columns: List<Field>,
11+
val primaryKeyColumnIDs: List<List<String>>,
12+
)

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/fakesource/FakeSourceDiscoverTest.kt

+4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class FakeSourceDiscoverTest {
3333
.withNamespace("PUBLIC")
3434
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
3535
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
36+
.withSourceDefinedCursor(false)
3637
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
3738
.withIsResumable(true)
3839
val kv =
@@ -41,6 +42,7 @@ class FakeSourceDiscoverTest {
4142
.withNamespace("PUBLIC")
4243
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
4344
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
45+
.withSourceDefinedCursor(false)
4446
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
4547
.withIsResumable(true)
4648
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
@@ -60,6 +62,7 @@ class FakeSourceDiscoverTest {
6062
.withNamespace("PUBLIC")
6163
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
6264
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
65+
.withSourceDefinedCursor(false)
6366
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
6467
.withIsResumable(true)
6568
val kv =
@@ -68,6 +71,7 @@ class FakeSourceDiscoverTest {
6871
.withNamespace("PUBLIC")
6972
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
7073
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
74+
.withSourceDefinedCursor(false)
7175
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
7276
.withIsResumable(true)
7377
val expected = AirbyteCatalog().withStreams(listOf(events, kv))

airbyte-cdk/bulk/core/extract/src/test/resources/fakesource/cdc-catalog.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
}
2222
},
2323
"supported_sync_modes": ["full_refresh", "incremental"],
24-
"default_cursor_field": ["ID", "TS"],
24+
"source_defined_cursor": false,
25+
"default_cursor_field": [],
2526
"source_defined_primary_key": [["ID"]],
2627
"is_resumable": true,
2728
"namespace": "PUBLIC"
@@ -47,7 +48,8 @@
4748
}
4849
},
4950
"supported_sync_modes": ["full_refresh", "incremental"],
50-
"default_cursor_field": ["K"],
51+
"source_defined_cursor": false,
52+
"default_cursor_field": [],
5153
"source_defined_primary_key": [["K"]],
5254
"is_resumable": true,
5355
"namespace": "PUBLIC"

airbyte-cdk/bulk/core/extract/src/test/resources/fakesource/expected-cursor-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
}
2525
},
2626
"supported_sync_modes": ["full_refresh", "incremental"],
27+
"source_defined_cursor": false,
2728
"default_cursor_field": [],
2829
"source_defined_primary_key": [["ID"]],
2930
"is_resumable": true,
@@ -48,6 +49,7 @@
4849
}
4950
},
5051
"supported_sync_modes": ["full_refresh", "incremental"],
52+
"source_defined_cursor": false,
5153
"default_cursor_field": [],
5254
"source_defined_primary_key": [["K"]],
5355
"is_resumable": true,

airbyte-cdk/bulk/core/extract/src/test/resources/read/cdc-catalog.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
}
2121
},
2222
"supported_sync_modes": ["full_refresh", "incremental"],
23-
"default_cursor_field": ["ID", "TS"],
23+
"source_defined_cursor": false,
24+
"default_cursor_field": [],
2425
"source_defined_primary_key": [["ID"]],
2526
"is_resumable": true,
2627
"namespace": "PUBLIC"
@@ -46,7 +47,8 @@
4647
}
4748
},
4849
"supported_sync_modes": ["full_refresh", "incremental"],
49-
"default_cursor_field": ["K"],
50+
"source_defined_cursor": false,
51+
"default_cursor_field": [],
5052
"source_defined_primary_key": [["K"]],
5153
"is_resumable": true,
5254
"namespace": "PUBLIC"

airbyte-cdk/bulk/core/extract/src/test/resources/read/cursor-catalog.json

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
}
2121
},
2222
"supported_sync_modes": ["full_refresh", "incremental"],
23+
"source_defined_cursor": false,
2324
"default_cursor_field": [],
2425
"source_defined_primary_key": [["ID"]],
2526
"is_resumable": true,
@@ -46,6 +47,7 @@
4647
}
4748
},
4849
"supported_sync_modes": ["full_refresh", "incremental"],
50+
"source_defined_cursor": false,
4951
"default_cursor_field": [],
5052
"source_defined_primary_key": [["K"]],
5153
"is_resumable": true,

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcAirbyteStreamDecorator.kt

-55
This file was deleted.

0 commit comments

Comments
 (0)