Skip to content

Commit 4b85dba

Browse files
authored
CDK: fixes for destination-postgres (#36619)
1 parent 90a830d commit 4b85dba

File tree

12 files changed

+61
-62
lines changed

12 files changed

+61
-62
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+25-23
Original file line numberDiff line numberDiff line change
@@ -144,29 +144,31 @@ Maven and Gradle will automatically reference the correct (pinned) version of th
144144

145145
| Version | Date | Pull Request | Subject |
146146
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
147-
| 0.28.19 | 2024-03-28 | [\#36610](https://github.com/airbytehq/airbyte/pull/36610) | remove airbyte-api generation, pull depdendency jars instead |
148-
| 0.28.19 | 2024-03-28 | [\#36611](https://github.com/airbytehq/airbyte/pull/36611) | disable spotbugs for CDK tes and testFixtures tasks |
149-
| 0.28.18 | 2024-03-28 | [\#36606](https://github.com/airbytehq/airbyte/pull/36574) | disable spotbugs for CDK tes and testFixtures tasks |
150-
| 0.28.18 | 2024-03-28 | [\#36574](https://github.com/airbytehq/airbyte/pull/36574) | Fix ContainerFactory |
151-
| 0.28.18 | 2024-03-27 | [\#36570](https://github.com/airbytehq/airbyte/pull/36570) | Convert missing s3-destinations tests to Kotlin |
152-
| 0.28.18 | 2024-03-27 | [\#36446](https://github.com/airbytehq/airbyte/pull/36446) | Convert dependencies submodule to Kotlin |
153-
| 0.28.18 | 2024-03-27 | [\#36445](https://github.com/airbytehq/airbyte/pull/36445) | Convert functional out Checked interfaces to kotlin
154-
| 0.28.18 | 2024-03-27 | [\#36444](https://github.com/airbytehq/airbyte/pull/36444) | Use apache-commons classes in our Checked functional interfaces |
155-
| 0.28.18 | 2024-03-27 | [\#36467](https://github.com/airbytehq/airbyte/pull/36467) | Convert #36465 to Kotlin
156-
| 0.28.18 | 2024-03-27 | [\#36473](https://github.com/airbytehq/airbyte/pull/36473) | Convert convert #36396 to Kotlin |
157-
| 0.28.18 | 2024-03-27 | [\#36439](https://github.com/airbytehq/airbyte/pull/36439) | Convert db-destinations submodule to Kotlin |
158-
| 0.28.18 | 2024-03-27 | [\#36438](https://github.com/airbytehq/airbyte/pull/36438) | Convert db-sources submodule to Kotlin |
159-
| 0.28.18 | 2024-03-26 | [\#36437](https://github.com/airbytehq/airbyte/pull/36437) | Convert gsc submodule to Kotlin |
160-
| 0.28.18 | 2024-03-26 | [\#36421](https://github.com/airbytehq/airbyte/pull/36421) | Convert typing-deduping submodule to Kotlin |
161-
| 0.28.18 | 2024-03-26 | [\#36420](https://github.com/airbytehq/airbyte/pull/36420) | Convert s3-destinations submodule to Kotlin |
162-
| 0.28.18 | 2024-03-26 | [\#36419](https://github.com/airbytehq/airbyte/pull/36419) | Convert azure submodule to Kotlin |
163-
| 0.28.18 | 2024-03-26 | [\#36413](https://github.com/airbytehq/airbyte/pull/36413) | Convert postgres submodule to Kotlin |
164-
| 0.28.18 | 2024-03-26 | [\#36412](https://github.com/airbytehq/airbyte/pull/36412) | Convert mongodb submodule to Kotlin |
165-
| 0.28.18 | 2024-03-26 | [\#36411](https://github.com/airbytehq/airbyte/pull/36411) | Convert datastore-bigquery submodule to Kotlin |
166-
| 0.28.18 | 2024-03-26 | [\#36205](https://github.com/airbytehq/airbyte/pull/36205) | Convert core/main to Kotlin |
167-
| 0.28.18 | 2024-03-26 | [\#36204](https://github.com/airbytehq/airbyte/pull/36204) | Convert core/test to Kotlin |
168-
| 0.28.18 | 2024-03-26 | [\#36190](https://github.com/airbytehq/airbyte/pull/36190) | Convert core/testFixtures to Kotlin |
169-
| 0.28.0 | 2024-03-26 | [\#36514](https://github.com/airbytehq/airbyte/pull/36514) | Bump CDK version to 0.28.0 |
147+
| 0.28.19 | 2024-03-29 | [\#36619](https://github.com/airbytehq/airbyte/pull/36619) | Changes to make destination-postgres compileable |
148+
| 0.28.19 | 2024-03-29 | [\#36588](https://github.com/airbytehq/airbyte/pull/36588) | Changes to make destination-redshift compileable |
149+
| 0.28.19 | 2024-03-29 | [\#36610](https://github.com/airbytehq/airbyte/pull/36610) | remove airbyte-api generation, pull depdendency jars instead |
150+
| 0.28.19 | 2024-03-29 | [\#36611](https://github.com/airbytehq/airbyte/pull/36611) | disable spotbugs for CDK tes and testFixtures tasks |
151+
| 0.28.18 | 2024-03-28 | [\#36606](https://github.com/airbytehq/airbyte/pull/36574) | disable spotbugs for CDK tes and testFixtures tasks |
152+
| 0.28.18 | 2024-03-28 | [\#36574](https://github.com/airbytehq/airbyte/pull/36574) | Fix ContainerFactory |
153+
| 0.28.18 | 2024-03-27 | [\#36570](https://github.com/airbytehq/airbyte/pull/36570) | Convert missing s3-destinations tests to Kotlin |
154+
| 0.28.18 | 2024-03-27 | [\#36446](https://github.com/airbytehq/airbyte/pull/36446) | Convert dependencies submodule to Kotlin |
155+
| 0.28.18 | 2024-03-27 | [\#36445](https://github.com/airbytehq/airbyte/pull/36445) | Convert functional out Checked interfaces to kotlin |
156+
| 0.28.18 | 2024-03-27 | [\#36444](https://github.com/airbytehq/airbyte/pull/36444) | Use apache-commons classes in our Checked functional interfaces |
157+
| 0.28.18 | 2024-03-27 | [\#36467](https://github.com/airbytehq/airbyte/pull/36467) | Convert #36465 to Kotlin |
158+
| 0.28.18 | 2024-03-27 | [\#36473](https://github.com/airbytehq/airbyte/pull/36473) | Convert convert #36396 to Kotlin |
159+
| 0.28.18 | 2024-03-27 | [\#36439](https://github.com/airbytehq/airbyte/pull/36439) | Convert db-destinations submodule to Kotlin |
160+
| 0.28.18 | 2024-03-27 | [\#36438](https://github.com/airbytehq/airbyte/pull/36438) | Convert db-sources submodule to Kotlin |
161+
| 0.28.18 | 2024-03-26 | [\#36437](https://github.com/airbytehq/airbyte/pull/36437) | Convert gsc submodule to Kotlin |
162+
| 0.28.18 | 2024-03-26 | [\#36421](https://github.com/airbytehq/airbyte/pull/36421) | Convert typing-deduping submodule to Kotlin |
163+
| 0.28.18 | 2024-03-26 | [\#36420](https://github.com/airbytehq/airbyte/pull/36420) | Convert s3-destinations submodule to Kotlin |
164+
| 0.28.18 | 2024-03-26 | [\#36419](https://github.com/airbytehq/airbyte/pull/36419) | Convert azure submodule to Kotlin |
165+
| 0.28.18 | 2024-03-26 | [\#36413](https://github.com/airbytehq/airbyte/pull/36413) | Convert postgres submodule to Kotlin |
166+
| 0.28.18 | 2024-03-26 | [\#36412](https://github.com/airbytehq/airbyte/pull/36412) | Convert mongodb submodule to Kotlin |
167+
| 0.28.18 | 2024-03-26 | [\#36411](https://github.com/airbytehq/airbyte/pull/36411) | Convert datastore-bigquery submodule to Kotlin |
168+
| 0.28.18 | 2024-03-26 | [\#36205](https://github.com/airbytehq/airbyte/pull/36205) | Convert core/main to Kotlin |
169+
| 0.28.18 | 2024-03-26 | [\#36204](https://github.com/airbytehq/airbyte/pull/36204) | Convert core/test to Kotlin |
170+
| 0.28.18 | 2024-03-26 | [\#36190](https://github.com/airbytehq/airbyte/pull/36190) | Convert core/testFixtures to Kotlin |
171+
| 0.28.0 | 2024-03-26 | [\#36514](https://github.com/airbytehq/airbyte/pull/36514) | Bump CDK version to 0.28.0 |
170172
| 0.27.7 | 2024-03-26 | [\#36466](https://github.com/airbytehq/airbyte/pull/36466) | Destinations: fix support for case-sensitive fields in destination state. |
171173
| 0.27.6 | 2024-03-26 | [\#36432](https://github.com/airbytehq/airbyte/pull/36432) | Sources support for AirbyteRecordMessageMeta during reading source data types. |
172174
| 0.27.5 | 2024-03-25 | [\#36461](https://github.com/airbytehq/airbyte/pull/36461) | Destinations: Handle case-sensitive columns in destination state handling. |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcUtils.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ object JdbcUtils {
6262
JDBCType.VARCHAR,
6363
JDBCType.LONGVARCHAR
6464
)
65-
@JvmField val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations()
65+
@JvmStatic val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations()
6666

6767
val defaultJSONFormat: JSONFormat = JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT)
6868

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
133133
return matcher.replaceAll("$1?$3")
134134
}
135135

136+
@JvmStatic
136137
fun addThrowableForDeinterpolation(klass: Class<out Throwable>) {
137138
THROWABLES_TO_DEINTERPOLATE.add(klass)
138139
}
@@ -154,8 +155,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
154155
}
155156
}
156157

157-
@VisibleForTesting
158-
fun addCommonStringsToDeinterpolate() {
158+
internal fun addCommonStringsToDeinterpolate() {
159159
// Add some common strings to deinterpolate, regardless of what the connector is doing
160160
addStringForDeinterpolation("airbyte")
161161
addStringForDeinterpolation("config")
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.28.18
1+
version=0.28.19

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt

+12-11
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.testcontainers.containers.JdbcDatabaseContainer
4343
*/
4444
abstract class TestDatabase<
4545
C : JdbcDatabaseContainer<*>, T : TestDatabase<C, T, B>, B : TestDatabase.ConfigBuilder<T, B>>
46-
protected constructor(@JvmField val container: C) : AutoCloseable {
46+
protected constructor(val container: C) : AutoCloseable {
4747
private val suffix: String = Strings.addRandomSuffix("", "_", 10)
4848
private val cleanupSQL: ArrayList<String> = ArrayList()
4949
private val connectionProperties: MutableMap<String, String> = HashMap()
@@ -52,16 +52,14 @@ protected constructor(@JvmField val container: C) : AutoCloseable {
5252

5353
@Volatile private lateinit var dslContext: DSLContext
5454

55-
protected val databaseId: Int
56-
protected val containerId: Int
55+
protected val databaseId: Int = nextDatabaseId.getAndIncrement()
56+
protected val containerId: Int =
57+
containerUidToId!!.computeIfAbsent(container.containerId) { k: String? ->
58+
nextContainerId!!.getAndIncrement()
59+
}!!
5760
private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
5861

5962
init {
60-
this.databaseId = nextDatabaseId!!.getAndIncrement()
61-
this.containerId =
62-
containerUidToId!!.computeIfAbsent(container!!.containerId) { k: String? ->
63-
nextContainerId!!.getAndIncrement()
64-
}!!
6563
LOGGER!!.info(formatLogLine("creating database " + databaseName))
6664
}
6765

@@ -300,18 +298,21 @@ protected constructor(@JvmField val container: C) : AutoCloseable {
300298
}
301299

302300
fun withSsl(sslMode: MutableMap<Any?, Any?>?): B {
303-
return with(JdbcUtils.SSL_KEY, true)!!.with(JdbcUtils.SSL_MODE_KEY, sslMode)
301+
return with(JdbcUtils.SSL_KEY, true).with(JdbcUtils.SSL_MODE_KEY, sslMode)
304302
}
305303

306304
companion object {
307-
val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration? = Duration.ofSeconds(5)
305+
private val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration = Duration.ofSeconds(5)
306+
fun getDefaultCdcReplicationInitialWait(): Duration {
307+
return DEFAULT_CDC_REPLICATION_INITIAL_WAIT
308+
}
308309
}
309310
}
310311

311312
companion object {
312313
private val LOGGER: Logger? = LoggerFactory.getLogger(TestDatabase::class.java)
313314

314-
private val nextDatabaseId: AtomicInteger? = AtomicInteger(0)
315+
private val nextDatabaseId: AtomicInteger = AtomicInteger(0)
315316

316317
private val nextContainerId: AtomicInteger? = AtomicInteger(0)
317318
private val containerUidToId: MutableMap<String?, Int?>? = ConcurrentHashMap()

airbyte-cdk/java/airbyte-cdk/datastore-postgres/src/main/kotlin/io/airbyte/cdk/integrations/util/PostgresSslConnectionUtils.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,8 @@ import java.io.*
88
import java.nio.charset.StandardCharsets
99
import java.util.concurrent.TimeUnit
1010
import org.apache.commons.lang3.RandomStringUtils
11-
import org.slf4j.Logger
12-
import org.slf4j.LoggerFactory
1311

1412
object PostgresSslConnectionUtils {
15-
private val LOGGER: Logger = LoggerFactory.getLogger(PostgresSslConnectionUtils::class.java)
1613
private const val CA_CERTIFICATE = "ca.crt"
1714
private const val CLIENT_CERTIFICATE = "client.crt"
1815
private const val CLIENT_KEY = "client.key"
@@ -34,6 +31,7 @@ object PostgresSslConnectionUtils {
3431
const val ENCRYPT_FILE_NAME: String = "encrypt"
3532
const val FACTORY_VALUE: String = "org.postgresql.ssl.DefaultJavaSSLFactory"
3633

34+
@JvmStatic
3735
fun obtainConnectionOptions(encryption: JsonNode): Map<String, String> {
3836
val additionalParameters: MutableMap<String, String> = HashMap()
3937
if (!encryption.isNull) {

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
130130
return modifyDataSourceBuilder(builder).build()
131131
}
132132

133-
protected fun modifyDataSourceBuilder(
133+
protected open fun modifyDataSourceBuilder(
134134
builder: DataSourceFactory.DataSourceBuilder
135135
): DataSourceFactory.DataSourceBuilder {
136136
return builder
@@ -232,7 +232,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
232232
}
233233

234234
val defaultNamespace = config[configSchemaKey].asText()
235-
addDefaultNamespaceToStreams(catalog!!, defaultNamespace)
235+
addDefaultNamespaceToStreams(catalog, defaultNamespace)
236236
return getV2MessageConsumer(
237237
config,
238238
catalog,
@@ -248,7 +248,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
248248
outputRecordCollector: Consumer<AirbyteMessage>,
249249
database: JdbcDatabase,
250250
defaultNamespace: String
251-
): SerializedAirbyteMessageConsumer? {
251+
): SerializedAirbyteMessageConsumer {
252252
val sqlGenerator = sqlGenerator
253253
val rawNamespaceOverride = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
254254
val parsedCatalog =

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ abstract class JdbcSqlOperations : SqlOperations {
8282
* For example, Postgres does not support index definitions within a CREATE TABLE statement, so
8383
* we need to run CREATE INDEX statements after creating the table.
8484
*/
85-
protected fun postCreateTableQueries(schemaName: String?, tableName: String?): List<String> {
85+
protected open fun postCreateTableQueries(
86+
schemaName: String?,
87+
tableName: String?
88+
): List<String> {
8689
return listOf()
8790
}
8891

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt

+3-13
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import kotlin.Boolean
2222
import kotlin.IllegalArgumentException
2323
import kotlin.Int
2424
import kotlin.String
25-
import kotlin.UnsupportedOperationException
2625
import kotlin.plus
2726
import org.jooq.*
2827
import org.jooq.conf.ParamType
@@ -71,7 +70,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
7170
}
7271

7372
@VisibleForTesting
74-
fun toDialectType(airbyteProtocolType: AirbyteProtocolType): DataType<*> {
73+
open fun toDialectType(airbyteProtocolType: AirbyteProtocolType): DataType<*> {
7574
return when (airbyteProtocolType) {
7675
AirbyteProtocolType.STRING -> SQLDataType.VARCHAR(65535)
7776
AirbyteProtocolType.NUMBER -> SQLDataType.DECIMAL(38, 9)
@@ -491,15 +490,6 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
491490
)
492491
}
493492

494-
private fun mergeTransaction(
495-
streamConfig: StreamConfig,
496-
finalSuffix: String,
497-
minRawTimestamp: Optional<Instant>,
498-
useExpensiveSaferCasting: Boolean
499-
): String {
500-
throw UnsupportedOperationException("Not implemented yet")
501-
}
502-
503493
protected fun createSchemaSql(namespace: String?): String {
504494
val dsl = dslContext
505495
val createSchemaSql = dsl.createSchemaIfNotExists(DSL.quotedName(namespace))
@@ -523,7 +513,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
523513
* statement. This is useful if the destination's CREATE TABLE statement does not accept an
524514
* index definition.
525515
*/
526-
protected fun createIndexSql(stream: StreamConfig?, suffix: String?): List<String> {
516+
protected open fun createIndexSql(stream: StreamConfig?, suffix: String?): List<String> {
527517
return emptyList()
528518
}
529519

@@ -617,7 +607,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
617607
}
618608
}
619609

620-
protected fun castedField(
610+
protected open fun castedField(
621611
field: Field<*>?,
622612
type: AirbyteProtocolType,
623613
useExpensiveSaferCasting: Boolean

0 commit comments

Comments
 (0)