Skip to content

Commit c4ad3d9

Browse files
convert destination-bigquery to kotlin CDK (#36899)
1 parent 993aece commit c4ad3d9

File tree

44 files changed

+251
-209
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+251
-209
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ Maven and Gradle will automatically reference the correct (pinned) version of th
144144

145145
| Version | Date | Pull Request | Subject |
146146
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
147+
| 0.30.11 | 2024-04-25 | [\#36899](https://github.com/airbytehq/airbyte/pull/36899) | changes for bigQuery destination. |
148+
| 0.30.10 | 2024-04-24 | [\#37541](https://github.com/airbytehq/airbyte/pull/37541) | remove excessive logging |
147149
| 0.30.9 | 2024-04-24 | [\#37477](https://github.com/airbytehq/airbyte/pull/37477) | remove unnecessary logs
148150
| 0.30.7 | 2024-04-23 | [\#37477](https://github.com/airbytehq/airbyte/pull/37477) | fix kotlin warnings in core CDK submodule
149151
| 0.30.7 | 2024-04-23 | [\#37484](https://github.com/airbytehq/airbyte/pull/37484) | fix kotlin warnings in dependencies CDK submodule |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
152152
}
153153
}
154154

155+
@JvmStatic
155156
fun addAllStringsInConfigForDeinterpolation(node: JsonNode) {
156157
if (node.isTextual) {
157158
addStringForDeinterpolation(node.asText())

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ object JavaBaseConstants {
3535
// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
3636
// use this column list.
3737
@JvmField
38-
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: Set<String> =
39-
java.util.Set.of(
38+
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List<String> =
39+
java.util.List.of(
4040
COLUMN_NAME_AB_RAW_ID,
4141
COLUMN_NAME_AB_EXTRACTED_AT,
4242
COLUMN_NAME_AB_LOADED_AT,
@@ -56,4 +56,9 @@ object JavaBaseConstants {
5656
java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META)
5757

5858
const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
59+
enum class DestinationColumns(val rawColumns: List<String>) {
60+
V2_WITH_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES),
61+
V2_WITHOUT_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
62+
LEGACY(JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS)
63+
}
5964
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
3030
import java.util.function.Consumer
3131
import java.util.stream.Collectors
3232
import kotlin.jvm.optionals.getOrNull
33+
import org.jetbrains.annotations.VisibleForTesting
3334

3435
private val logger = KotlinLogging.logger {}
3536

@@ -41,7 +42,10 @@ private val logger = KotlinLogging.logger {}
4142
* memory limit governed by [GlobalMemoryManager]. Record writing is decoupled via [FlushWorkers].
4243
* See the other linked class for more detail.
4344
*/
44-
class AsyncStreamConsumer(
45+
open class AsyncStreamConsumer
46+
@VisibleForTesting
47+
@JvmOverloads
48+
constructor(
4549
outputRecordCollector: Consumer<AirbyteMessage>,
4650
private val onStart: OnStartFunction,
4751
private val onClose: OnCloseFunction,
@@ -51,7 +55,8 @@ class AsyncStreamConsumer(
5155
private val defaultNamespace: Optional<String>,
5256
private val flushFailure: FlushFailure = FlushFailure(),
5357
workerPool: ExecutorService = Executors.newFixedThreadPool(5),
54-
private val airbyteMessageDeserializer: AirbyteMessageDeserializer,
58+
private val airbyteMessageDeserializer: AirbyteMessageDeserializer =
59+
AirbyteMessageDeserializer(),
5560
) : SerializedAirbyteMessageConsumer {
5661
private val bufferEnqueue: BufferEnqueue = bufferManager.bufferEnqueue
5762
private val flushWorkers: FlushWorkers =

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
1414
*
1515
* The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map
1616
* will be treated as equivalent to [StreamSyncSummary.DEFAULT].
17+
*
18+
* The @JvmSuppressWildcards is here so that the 2nd parameter of accept stays a java
19+
* Map<StreamDescriptor, StreamSyncSummary> rather than becoming a Map<StreamDescriptor, ? extends
20+
* StreamSyncSummary>
1721
*/
1822
fun interface OnCloseFunction :
19-
CheckedBiConsumer<Boolean, Map<StreamDescriptor, StreamSyncSummary>, Exception>
23+
CheckedBiConsumer<
24+
Boolean, @JvmSuppressWildcards Map<StreamDescriptor, StreamSyncSummary>, Exception>
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.30.10
1+
version=0.30.11

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1469,7 +1469,7 @@ abstract class DestinationAcceptanceTest {
14691469
}
14701470

14711471
/** Whether the destination should be tested against different namespaces. */
1472-
protected fun supportNamespaceTest(): Boolean {
1472+
protected open fun supportNamespaceTest(): Boolean {
14731473
return false
14741474
}
14751475

@@ -1485,7 +1485,7 @@ abstract class DestinationAcceptanceTest {
14851485
* normalized namespace when testCaseId = "S3A-1". Find the testCaseId in
14861486
* "namespace_test_cases.json".
14871487
*/
1488-
protected fun assertNamespaceNormalization(
1488+
protected open fun assertNamespaceNormalization(
14891489
testCaseId: String?,
14901490
expectedNormalizedNamespace: String?,
14911491
actualNormalizedNamespace: String?

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/LocalAirbyteDestination.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ class LocalAirbyteDestination(private val dest: Destination) : AirbyteDestinatio
6161
return isClosed
6262
}
6363

64-
override fun getExitValue(): Int {
65-
return 0
66-
}
64+
override val exitValue = 0
6765

6866
override fun attemptRead(): Optional<io.airbyte.protocol.models.AirbyteMessage> {
6967
return Optional.empty()

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ open class AdvancedTestDataComparator : TestDataComparator {
2828
return java.util.List.of(identifier)
2929
}
3030

31-
protected fun compareObjects(expectedObject: JsonNode, actualObject: JsonNode) {
31+
protected open fun compareObjects(expectedObject: JsonNode, actualObject: JsonNode) {
3232
if (!areBothEmpty(expectedObject, actualObject)) {
3333
LOGGER.info("Expected Object : {}", expectedObject)
3434
LOGGER.info("Actual Object : {}", actualObject)

airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/AirbyteDestination.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ interface AirbyteDestination : CheckedConsumer<AirbyteMessage, Exception>, AutoC
7272
* @return exit code of the destination process
7373
* @throws IllegalStateException if the destination process has not exited
7474
*/
75-
fun getExitValue(): Int
75+
abstract val exitValue: Int
7676

7777
/**
7878
* Attempts to read an AirbyteMessage from the Destination.

airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt

+11-17
Original file line numberDiff line numberDiff line change
@@ -51,25 +51,19 @@ constructor(
5151
private var messageIterator: Iterator<AirbyteMessage?>? = null
5252

5353
private var exitValueIsSet = false
54-
private var exitValue: Int = 0
55-
override fun getExitValue(): Int {
56-
Preconditions.checkState(
57-
destinationProcess != null,
58-
"Destination process is null, cannot retrieve exit value."
59-
)
60-
Preconditions.checkState(
61-
!destinationProcess!!.isAlive,
62-
"Destination process is still alive, cannot retrieve exit value."
63-
)
64-
65-
if (!exitValueIsSet) {
66-
exitValueIsSet = true
67-
exitValue = destinationProcess!!.exitValue()
54+
override val exitValue: Int
55+
get() {
56+
Preconditions.checkState(
57+
destinationProcess != null,
58+
"Destination process is null, cannot retrieve exit value."
59+
)
60+
Preconditions.checkState(
61+
!destinationProcess!!.isAlive,
62+
"Destination process is still alive, cannot retrieve exit value."
63+
)
64+
return destinationProcess!!.exitValue()
6865
}
6966

70-
return exitValue
71-
}
72-
7367
@Throws(IOException::class, TestHarnessException::class)
7468
override fun start(
7569
destinationConfig: WorkerDestinationConfig,

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class GcsDestinationConfig(
7070
companion object {
7171
private const val GCS_ENDPOINT = "https://storage.googleapis.com"
7272

73+
@JvmStatic
7374
fun getGcsDestinationConfig(config: JsonNode): GcsDestinationConfig {
7475
return GcsDestinationConfig(
7576
config["gcs_bucket_name"].asText(),

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt

+21-17
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.JsonNode
77
import io.airbyte.cdk.integrations.base.JavaBaseConstants
88
import io.airbyte.commons.json.Jsons
99
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
10+
import io.github.oshai.kotlinlogging.KotlinLogging
1011
import java.time.Instant
1112
import java.util.*
1213

14+
private val logger = KotlinLogging.logger {}
15+
1316
/**
1417
* A CsvSheetGenerator that produces data in the format expected by JdbcSqlOperations. See
1518
* JdbcSqlOperations#createTableQuery.
@@ -24,14 +27,12 @@ import java.util.*
2427
*/
2528
class StagingDatabaseCsvSheetGenerator
2629
@JvmOverloads
27-
constructor(private val useDestinationsV2Columns: Boolean = false) : CsvSheetGenerator {
28-
// TODO is this even used anywhere?
29-
private var header: List<String> =
30-
if (this.useDestinationsV2Columns) JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES
31-
else JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS
32-
30+
constructor(
31+
private val destinationColumns: JavaBaseConstants.DestinationColumns =
32+
JavaBaseConstants.DestinationColumns.LEGACY
33+
) : CsvSheetGenerator {
3334
override fun getHeaderRow(): List<String> {
34-
return header
35+
return destinationColumns.rawColumns
3536
}
3637

3738
override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
@@ -53,16 +54,19 @@ constructor(private val useDestinationsV2Columns: Boolean = false) : CsvSheetGen
5354
emittedAt: Long,
5455
formattedAirbyteMetaString: String
5556
): List<Any> {
56-
return if (useDestinationsV2Columns) {
57-
java.util.List.of<Any>(
58-
id,
59-
Instant.ofEpochMilli(emittedAt),
60-
"",
61-
formattedString,
62-
formattedAirbyteMetaString
63-
)
64-
} else {
65-
java.util.List.of<Any>(id, formattedString, Instant.ofEpochMilli(emittedAt))
57+
return when (destinationColumns) {
58+
JavaBaseConstants.DestinationColumns.LEGACY ->
59+
listOf(id, formattedString, Instant.ofEpochMilli(emittedAt))
60+
JavaBaseConstants.DestinationColumns.V2_WITH_META ->
61+
listOf(
62+
id,
63+
Instant.ofEpochMilli(emittedAt),
64+
"",
65+
formattedString,
66+
formattedAirbyteMetaString
67+
)
68+
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META ->
69+
listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString)
6670
}
6771
}
6872
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.airbyte.cdk.integrations.destination.staging
55

66
import io.airbyte.cdk.db.jdbc.JdbcDatabase
7+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
78
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
89
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
910
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig
@@ -39,7 +40,7 @@ internal class AsyncFlush(
3940
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
4041
// freed earlier similar to a sliding window effect
4142
override val optimalBatchSizeBytes: Long,
42-
private val useDestinationsV2Columns: Boolean
43+
private val destinationColumns: JavaBaseConstants.DestinationColumns
4344
) : DestinationFlushFunction {
4445

4546
@Throws(Exception::class)
@@ -49,7 +50,7 @@ internal class AsyncFlush(
4950
writer =
5051
CsvSerializedBuffer(
5152
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
52-
StagingDatabaseCsvSheetGenerator(useDestinationsV2Columns),
53+
StagingDatabaseCsvSheetGenerator(destinationColumns),
5354
true
5455
)
5556

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt

+25-25
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.staging
66
import com.fasterxml.jackson.databind.JsonNode
77
import com.google.common.base.Preconditions
88
import io.airbyte.cdk.db.jdbc.JdbcDatabase
9+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
910
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
1011
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
1112
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
@@ -46,7 +47,7 @@ private constructor(
4647
private val typerDeduper: TyperDeduper?,
4748
private val parsedCatalog: ParsedCatalog?,
4849
private val defaultNamespace: String?,
49-
private val useDestinationsV2Columns: Boolean,
50+
private val destinationColumns: JavaBaseConstants.DestinationColumns,
5051
// Optional fields
5152
private val bufferMemoryLimit: Optional<Long>,
5253
private val optimalBatchSizeBytes: Long,
@@ -68,7 +69,8 @@ private constructor(
6869
var typerDeduper: TyperDeduper? = null
6970
var parsedCatalog: ParsedCatalog? = null
7071
var defaultNamespace: String? = null
71-
var useDestinationsV2Columns: Boolean = false
72+
var destinationColumns: JavaBaseConstants.DestinationColumns =
73+
JavaBaseConstants.DestinationColumns.LEGACY
7274

7375
// Optional fields
7476
private var bufferMemoryLimit = Optional.empty<Long>()
@@ -104,7 +106,7 @@ private constructor(
104106
typerDeduper,
105107
parsedCatalog,
106108
defaultNamespace,
107-
useDestinationsV2Columns,
109+
destinationColumns,
108110
bufferMemoryLimit,
109111
optimalBatchSizeBytes,
110112
(if (dataTransformer != null) dataTransformer else IdentityDataTransformer())!!
@@ -118,13 +120,7 @@ private constructor(
118120
val stagingOperations = this.stagingOperations!!
119121

120122
val writeConfigs: List<WriteConfig> =
121-
createWriteConfigs(
122-
namingResolver,
123-
config,
124-
catalog,
125-
parsedCatalog,
126-
useDestinationsV2Columns
127-
)
123+
createWriteConfigs(namingResolver, config, catalog, parsedCatalog, destinationColumns)
128124
val streamDescToWriteConfig: Map<StreamDescriptor, WriteConfig> =
129125
streamDescToWriteConfig(writeConfigs)
130126
val flusher =
@@ -136,7 +132,7 @@ private constructor(
136132
typerDeduperValve,
137133
typerDeduper,
138134
optimalBatchSizeBytes,
139-
useDestinationsV2Columns
135+
destinationColumns
140136
)
141137
return AsyncStreamConsumer(
142138
outputRecordCollector!!,
@@ -181,7 +177,7 @@ private constructor(
181177
typerDeduper: TyperDeduper,
182178
parsedCatalog: ParsedCatalog?,
183179
defaultNamespace: String?,
184-
useDestinationsV2Columns: Boolean
180+
destinationColumns: JavaBaseConstants.DestinationColumns
185181
): Builder {
186182
val builder = Builder()
187183
builder.outputRecordCollector = outputRecordCollector
@@ -195,7 +191,7 @@ private constructor(
195191
builder.typerDeduper = typerDeduper
196192
builder.parsedCatalog = parsedCatalog
197193
builder.defaultNamespace = defaultNamespace
198-
builder.useDestinationsV2Columns = useDestinationsV2Columns
194+
builder.destinationColumns = destinationColumns
199195
return builder
200196
}
201197

@@ -263,20 +259,20 @@ private constructor(
263259
config: JsonNode?,
264260
catalog: ConfiguredAirbyteCatalog?,
265261
parsedCatalog: ParsedCatalog?,
266-
useDestinationsV2Columns: Boolean
262+
destinationColumns: JavaBaseConstants.DestinationColumns
267263
): List<WriteConfig> {
268264
return catalog!!
269265
.streams
270266
.stream()
271-
.map(toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns))
267+
.map(toWriteConfig(namingResolver, config, parsedCatalog, destinationColumns))
272268
.toList()
273269
}
274270

275271
private fun toWriteConfig(
276272
namingResolver: NamingConventionTransformer?,
277273
config: JsonNode?,
278274
parsedCatalog: ParsedCatalog?,
279-
useDestinationsV2Columns: Boolean
275+
destinationColumns: JavaBaseConstants.DestinationColumns
280276
): Function<ConfiguredAirbyteStream, WriteConfig> {
281277
return Function<ConfiguredAirbyteStream, WriteConfig> { stream: ConfiguredAirbyteStream
282278
->
@@ -289,15 +285,19 @@ private constructor(
289285

290286
val outputSchema: String
291287
val tableName: String
292-
if (useDestinationsV2Columns) {
293-
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
294-
outputSchema = streamId.rawNamespace!!
295-
tableName = streamId.rawName!!
296-
} else {
297-
outputSchema =
298-
getOutputSchema(abStream, config!!["schema"].asText(), namingResolver)
299-
tableName =
300-
@Suppress("deprecation") namingResolver!!.getRawTableName(streamName)
288+
when (destinationColumns) {
289+
JavaBaseConstants.DestinationColumns.V2_WITH_META,
290+
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> {
291+
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
292+
outputSchema = streamId.rawNamespace!!
293+
tableName = streamId.rawName!!
294+
}
295+
JavaBaseConstants.DestinationColumns.LEGACY -> {
296+
outputSchema =
297+
getOutputSchema(abStream, config!!["schema"].asText(), namingResolver)
298+
tableName =
299+
@Suppress("deprecation") namingResolver!!.getRawTableName(streamName)
300+
}
301301
}
302302
val tmpTableName =
303303
@Suppress("deprecation") namingResolver!!.getTmpTableName(streamName)

0 commit comments

Comments
 (0)