Skip to content

Commit 515c9b4

Browse files
bring in #36465 (#36467)
1 parent b31bc30 commit 515c9b4

File tree

6 files changed

+27
-167
lines changed

6 files changed

+27
-167
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.28.13
1+
version=0.28.14

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/DataAdapter.kt

-61
This file was deleted.

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

+3-24
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,13 @@ import java.sql.SQLException
1717
import java.sql.Timestamp
1818
import java.time.Instant
1919
import java.util.*
20-
import java.util.function.Consumer
2120
import org.apache.commons.csv.CSVFormat
2221
import org.apache.commons.csv.CSVPrinter
2322

2423
abstract class JdbcSqlOperations : SqlOperations {
25-
// this adapter modifies record message before inserting them to the destination
26-
protected val dataAdapter: Optional<DataAdapter>
2724
protected val schemaSet: MutableSet<String?> = HashSet()
2825

29-
protected constructor() {
30-
this.dataAdapter = Optional.empty()
31-
}
32-
33-
protected constructor(dataAdapter: DataAdapter) {
34-
this.dataAdapter = Optional.of(dataAdapter)
35-
}
26+
protected constructor() {}
3627

3728
@Throws(Exception::class)
3829
override fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?) {
@@ -146,11 +137,8 @@ abstract class JdbcSqlOperations : SqlOperations {
146137
CSVPrinter(writer, CSVFormat.DEFAULT).use { csvPrinter ->
147138
for (record in records) {
148139
val uuid = UUID.randomUUID().toString()
149-
// TODO we only need to do this is formatData is overridden. If not, we can just
150-
// do jsonData =
151-
// record.getSerialized()
152-
val jsonData =
153-
Jsons.serialize(formatData(Jsons.deserializeExact(record.serialized)))
140+
141+
val jsonData = record.serialized
154142
val airbyteMeta = Jsons.serialize(record.record!!.meta)
155143
val extractedAt =
156144
Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt))
@@ -233,15 +221,6 @@ abstract class JdbcSqlOperations : SqlOperations {
233221
schemaName: String?,
234222
tableName: String?
235223
) {
236-
dataAdapter.ifPresent { adapter: DataAdapter ->
237-
records!!.forEach(
238-
Consumer { airbyteRecordMessage: PartialAirbyteMessage? ->
239-
val data = Jsons.deserializeExact(airbyteRecordMessage!!.serialized)
240-
adapter.adapt(data)
241-
airbyteRecordMessage.serialized = Jsons.serialize(data)
242-
}
243-
)
244-
}
245224
if (isDestinationV2) {
246225
insertRecordsInternalV2(database, records, schemaName, tableName)
247226
} else {

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt

+12-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.time.Instant
2222
import java.time.OffsetDateTime
2323
import java.time.temporal.ChronoUnit
2424
import java.util.*
25+
import java.util.HashMap
2526
import java.util.concurrent.CompletableFuture
2627
import java.util.concurrent.CompletionStage
2728
import java.util.function.Predicate
@@ -254,12 +255,18 @@ abstract class JdbcDestinationHandler<DestinationState>(
254255
// This is to handle any destinations that upcase the column names.
255256
// For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
256257
val record = recordJson as ObjectNode
257-
record.fieldNames().forEachRemaining { fieldName: String ->
258-
record.set<JsonNode>(
259-
fieldName.lowercase(Locale.getDefault()),
260-
record[fieldName]
261-
)
258+
val newFields: HashMap<String, JsonNode> = HashMap()
259+
260+
val it = record.fieldNames()
261+
while (it.hasNext()) {
262+
val fieldName = it.next()
263+
// We can't directly call record.set here, because that will raise a
264+
// ConcurrentModificationException on the fieldnames iterator.
265+
// Instead, build up a map of new fields and set them all at once.
266+
newFields.put(fieldName.lowercase(Locale.getDefault()), record[fieldName])
262267
}
268+
269+
record.setAll<JsonNode>(newFields)
263270
}
264271
.collect(
265272
toMap(

airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/DataAdapterTest.kt

-73
This file was deleted.

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcTypingDedupingTest.kt

+11-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase
1111
import io.airbyte.cdk.db.jdbc.JdbcDatabase
1212
import io.airbyte.cdk.db.jdbc.JdbcUtils
1313
import io.airbyte.cdk.integrations.base.JavaBaseConstants
14+
import io.airbyte.commons.text.Names
1415
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest
1516
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
1617
import javax.sql.DataSource
@@ -23,7 +24,7 @@ import org.jooq.impl.DSL
2324
* anything. At some point we might (?) want to do a refactor to combine them.
2425
*/
2526
abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
26-
private var database: JdbcDatabase? = null
27+
protected var database: JdbcDatabase? = null
2728
private var dataSource: DataSource? = null
2829

2930
protected abstract val baseConfig: ObjectNode
@@ -83,7 +84,11 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
8384
if (streamNamespace == null) {
8485
streamNamespace = getDefaultSchema(config!!)
8586
}
86-
val tableName = concatenateRawTableName(streamNamespace, streamName!!)
87+
val tableName =
88+
concatenateRawTableName(
89+
streamNamespace,
90+
Names.toAlphanumericAndUnderscore(streamName!!)
91+
)
8792
val schema = rawSchema
8893
return database!!.queryJsons(DSL.selectFrom(DSL.name(schema, tableName)).sql)
8994
}
@@ -97,7 +102,10 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
97102
if (streamNamespace == null) {
98103
streamNamespace = getDefaultSchema(config!!)
99104
}
100-
return database!!.queryJsons(DSL.selectFrom(DSL.name(streamNamespace, streamName)).sql)
105+
return database!!.queryJsons(
106+
DSL.selectFrom(DSL.name(streamNamespace, Names.toAlphanumericAndUnderscore(streamName)))
107+
.sql
108+
)
101109
}
102110

103111
@Throws(Exception::class)

0 commit comments

Comments
 (0)