Skip to content

bring in #36465 #36467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.13
version=0.28.14

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,13 @@ import java.sql.SQLException
import java.sql.Timestamp
import java.time.Instant
import java.util.*
import java.util.function.Consumer
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter

abstract class JdbcSqlOperations : SqlOperations {
// this adapter modifies record message before inserting them to the destination
protected val dataAdapter: Optional<DataAdapter>
protected val schemaSet: MutableSet<String?> = HashSet()

protected constructor() {
this.dataAdapter = Optional.empty()
}

protected constructor(dataAdapter: DataAdapter) {
this.dataAdapter = Optional.of(dataAdapter)
}
protected constructor() {}

@Throws(Exception::class)
override fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?) {
Expand Down Expand Up @@ -146,11 +137,8 @@ abstract class JdbcSqlOperations : SqlOperations {
CSVPrinter(writer, CSVFormat.DEFAULT).use { csvPrinter ->
for (record in records) {
val uuid = UUID.randomUUID().toString()
// TODO we only need to do this is formatData is overridden. If not, we can just
// do jsonData =
// record.getSerialized()
val jsonData =
Jsons.serialize(formatData(Jsons.deserializeExact(record.serialized)))

val jsonData = record.serialized
val airbyteMeta = Jsons.serialize(record.record!!.meta)
val extractedAt =
Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt))
Expand Down Expand Up @@ -233,15 +221,6 @@ abstract class JdbcSqlOperations : SqlOperations {
schemaName: String?,
tableName: String?
) {
dataAdapter.ifPresent { adapter: DataAdapter ->
records!!.forEach(
Consumer { airbyteRecordMessage: PartialAirbyteMessage? ->
val data = Jsons.deserializeExact(airbyteRecordMessage!!.serialized)
adapter.adapt(data)
airbyteRecordMessage.serialized = Jsons.serialize(data)
}
)
}
if (isDestinationV2) {
insertRecordsInternalV2(database, records, schemaName, tableName)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.HashMap
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Predicate
Expand Down Expand Up @@ -254,12 +255,18 @@ abstract class JdbcDestinationHandler<DestinationState>(
// This is to handle any destinations that upcase the column names.
// For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
val record = recordJson as ObjectNode
record.fieldNames().forEachRemaining { fieldName: String ->
record.set<JsonNode>(
fieldName.lowercase(Locale.getDefault()),
record[fieldName]
)
val newFields: HashMap<String, JsonNode> = HashMap()

val it = record.fieldNames()
while (it.hasNext()) {
val fieldName = it.next()
// We can't directly call record.set here, because that will raise a
// ConcurrentModificationException on the fieldnames iterator.
// Instead, build up a map of new fields and set them all at once.
newFields.put(fieldName.lowercase(Locale.getDefault()), record[fieldName])
}

record.setAll<JsonNode>(newFields)
}
.collect(
toMap(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.commons.text.Names
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
import javax.sql.DataSource
Expand All @@ -23,7 +24,7 @@ import org.jooq.impl.DSL
* anything. At some point we might (?) want to do a refactor to combine them.
*/
abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
private var database: JdbcDatabase? = null
protected var database: JdbcDatabase? = null
private var dataSource: DataSource? = null

protected abstract val baseConfig: ObjectNode
Expand Down Expand Up @@ -83,7 +84,11 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
if (streamNamespace == null) {
streamNamespace = getDefaultSchema(config!!)
}
val tableName = concatenateRawTableName(streamNamespace, streamName!!)
val tableName =
concatenateRawTableName(
streamNamespace,
Names.toAlphanumericAndUnderscore(streamName!!)
)
val schema = rawSchema
return database!!.queryJsons(DSL.selectFrom(DSL.name(schema, tableName)).sql)
}
Expand All @@ -97,7 +102,10 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
if (streamNamespace == null) {
streamNamespace = getDefaultSchema(config!!)
}
return database!!.queryJsons(DSL.selectFrom(DSL.name(streamNamespace, streamName)).sql)
return database!!.queryJsons(
DSL.selectFrom(DSL.name(streamNamespace, Names.toAlphanumericAndUnderscore(streamName)))
.sql
)
}

@Throws(Exception::class)
Expand Down