Skip to content

CDK: fixes for destination-postgres #36619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,29 +144,31 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.28.19 | 2024-03-28 | [\#36610](https://github.com/airbytehq/airbyte/pull/36610) | remove airbyte-api generation, pull depdendency jars instead |
| 0.28.19 | 2024-03-28 | [\#36611](https://github.com/airbytehq/airbyte/pull/36611) | disable spotbugs for CDK tes and testFixtures tasks |
| 0.28.18 | 2024-03-28 | [\#36606](https://github.com/airbytehq/airbyte/pull/36574) | disable spotbugs for CDK tes and testFixtures tasks |
| 0.28.18 | 2024-03-28 | [\#36574](https://github.com/airbytehq/airbyte/pull/36574) | Fix ContainerFactory |
| 0.28.18 | 2024-03-27 | [\#36570](https://github.com/airbytehq/airbyte/pull/36570) | Convert missing s3-destinations tests to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36446](https://github.com/airbytehq/airbyte/pull/36446) | Convert dependencies submodule to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36445](https://github.com/airbytehq/airbyte/pull/36445) | Convert functional out Checked interfaces to kotlin
| 0.28.18 | 2024-03-27 | [\#36444](https://github.com/airbytehq/airbyte/pull/36444) | Use apache-commons classes in our Checked functional interfaces |
| 0.28.18 | 2024-03-27 | [\#36467](https://github.com/airbytehq/airbyte/pull/36467) | Convert #36465 to Kotlin
| 0.28.18 | 2024-03-27 | [\#36473](https://github.com/airbytehq/airbyte/pull/36473) | Convert convert #36396 to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36439](https://github.com/airbytehq/airbyte/pull/36439) | Convert db-destinations submodule to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36438](https://github.com/airbytehq/airbyte/pull/36438) | Convert db-sources submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36437](https://github.com/airbytehq/airbyte/pull/36437) | Convert gsc submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36421](https://github.com/airbytehq/airbyte/pull/36421) | Convert typing-deduping submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36420](https://github.com/airbytehq/airbyte/pull/36420) | Convert s3-destinations submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36419](https://github.com/airbytehq/airbyte/pull/36419) | Convert azure submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36413](https://github.com/airbytehq/airbyte/pull/36413) | Convert postgres submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36412](https://github.com/airbytehq/airbyte/pull/36412) | Convert mongodb submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36411](https://github.com/airbytehq/airbyte/pull/36411) | Convert datastore-bigquery submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36205](https://github.com/airbytehq/airbyte/pull/36205) | Convert core/main to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36204](https://github.com/airbytehq/airbyte/pull/36204) | Convert core/test to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36190](https://github.com/airbytehq/airbyte/pull/36190) | Convert core/testFixtures to Kotlin |
| 0.28.0 | 2024-03-26 | [\#36514](https://github.com/airbytehq/airbyte/pull/36514) | Bump CDK version to 0.28.0 |
| 0.28.19 | 2024-03-29 | [\#36619](https://github.com/airbytehq/airbyte/pull/36619) | Changes to make destination-postgres compileable |
| 0.28.19 | 2024-03-29 | [\#36588](https://github.com/airbytehq/airbyte/pull/36588) | Changes to make destination-redshift compileable |
| 0.28.19 | 2024-03-29 | [\#36610](https://github.com/airbytehq/airbyte/pull/36610) | remove airbyte-api generation, pull depdendency jars instead |
| 0.28.19 | 2024-03-29 | [\#36611](https://github.com/airbytehq/airbyte/pull/36611) | disable spotbugs for CDK tes and testFixtures tasks |
| 0.28.18 | 2024-03-28 | [\#36606](https://github.com/airbytehq/airbyte/pull/36574) | disable spotbugs for CDK tes and testFixtures tasks |
| 0.28.18 | 2024-03-28 | [\#36574](https://github.com/airbytehq/airbyte/pull/36574) | Fix ContainerFactory |
| 0.28.18 | 2024-03-27 | [\#36570](https://github.com/airbytehq/airbyte/pull/36570) | Convert missing s3-destinations tests to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36446](https://github.com/airbytehq/airbyte/pull/36446) | Convert dependencies submodule to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36445](https://github.com/airbytehq/airbyte/pull/36445) | Convert functional out Checked interfaces to kotlin |
| 0.28.18 | 2024-03-27 | [\#36444](https://github.com/airbytehq/airbyte/pull/36444) | Use apache-commons classes in our Checked functional interfaces |
| 0.28.18 | 2024-03-27 | [\#36467](https://github.com/airbytehq/airbyte/pull/36467) | Convert #36465 to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36473](https://github.com/airbytehq/airbyte/pull/36473) | Convert convert #36396 to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36439](https://github.com/airbytehq/airbyte/pull/36439) | Convert db-destinations submodule to Kotlin |
| 0.28.18 | 2024-03-27 | [\#36438](https://github.com/airbytehq/airbyte/pull/36438) | Convert db-sources submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36437](https://github.com/airbytehq/airbyte/pull/36437) | Convert gsc submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36421](https://github.com/airbytehq/airbyte/pull/36421) | Convert typing-deduping submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36420](https://github.com/airbytehq/airbyte/pull/36420) | Convert s3-destinations submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36419](https://github.com/airbytehq/airbyte/pull/36419) | Convert azure submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36413](https://github.com/airbytehq/airbyte/pull/36413) | Convert postgres submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36412](https://github.com/airbytehq/airbyte/pull/36412) | Convert mongodb submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36411](https://github.com/airbytehq/airbyte/pull/36411) | Convert datastore-bigquery submodule to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36205](https://github.com/airbytehq/airbyte/pull/36205) | Convert core/main to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36204](https://github.com/airbytehq/airbyte/pull/36204) | Convert core/test to Kotlin |
| 0.28.18 | 2024-03-26 | [\#36190](https://github.com/airbytehq/airbyte/pull/36190) | Convert core/testFixtures to Kotlin |
| 0.28.0 | 2024-03-26 | [\#36514](https://github.com/airbytehq/airbyte/pull/36514) | Bump CDK version to 0.28.0 |
| 0.27.7 | 2024-03-26 | [\#36466](https://github.com/airbytehq/airbyte/pull/36466) | Destinations: fix support for case-sensitive fields in destination state. |
| 0.27.6 | 2024-03-26 | [\#36432](https://github.com/airbytehq/airbyte/pull/36432) | Sources support for AirbyteRecordMessageMeta during reading source data types. |
| 0.27.5 | 2024-03-25 | [\#36461](https://github.com/airbytehq/airbyte/pull/36461) | Destinations: Handle case-sensitive columns in destination state handling. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object JdbcUtils {
JDBCType.VARCHAR,
JDBCType.LONGVARCHAR
)
@JvmField val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations()
@JvmStatic val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdpgrailsdev This was accessed as a static member in Java, any better way to do than the escape hatch ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what the escape hatch is


val defaultJSONFormat: JSONFormat = JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
return matcher.replaceAll("$1?$3")
}

@JvmStatic
fun addThrowableForDeinterpolation(klass: Class<out Throwable>) {
THROWABLES_TO_DEINTERPOLATE.add(klass)
}
Expand All @@ -154,8 +155,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
}
}

@VisibleForTesting
fun addCommonStringsToDeinterpolate() {
internal fun addCommonStringsToDeinterpolate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

// Add some common strings to deinterpolate, regardless of what the connector is doing
addStringForDeinterpolation("airbyte")
addStringForDeinterpolation("config")
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.18
version=0.28.19
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.testcontainers.containers.JdbcDatabaseContainer
*/
abstract class TestDatabase<
C : JdbcDatabaseContainer<*>, T : TestDatabase<C, T, B>, B : TestDatabase.ConfigBuilder<T, B>>
protected constructor(@JvmField val container: C) : AutoCloseable {
protected constructor(val container: C) : AutoCloseable {
private val suffix: String = Strings.addRandomSuffix("", "_", 10)
private val cleanupSQL: ArrayList<String> = ArrayList()
private val connectionProperties: MutableMap<String, String> = HashMap()
Expand All @@ -52,16 +52,14 @@ protected constructor(@JvmField val container: C) : AutoCloseable {

@Volatile private lateinit var dslContext: DSLContext

protected val databaseId: Int
protected val containerId: Int
protected val databaseId: Int = nextDatabaseId.getAndIncrement()
protected val containerId: Int =
containerUidToId!!.computeIfAbsent(container.containerId) { k: String? ->
nextContainerId!!.getAndIncrement()
}!!
private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")

init {
this.databaseId = nextDatabaseId!!.getAndIncrement()
this.containerId =
containerUidToId!!.computeIfAbsent(container!!.containerId) { k: String? ->
nextContainerId!!.getAndIncrement()
}!!
LOGGER!!.info(formatLogLine("creating database " + databaseName))
}

Expand Down Expand Up @@ -300,18 +298,21 @@ protected constructor(@JvmField val container: C) : AutoCloseable {
}

fun withSsl(sslMode: MutableMap<Any?, Any?>?): B {
return with(JdbcUtils.SSL_KEY, true)!!.with(JdbcUtils.SSL_MODE_KEY, sslMode)
return with(JdbcUtils.SSL_KEY, true).with(JdbcUtils.SSL_MODE_KEY, sslMode)
}

companion object {
val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration? = Duration.ofSeconds(5)
private val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration = Duration.ofSeconds(5)
Copy link
Contributor

@stephane-airbyte stephane-airbyte Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JvmStatic protected val DEFXXX: Duration = XXX

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in java it looks like this getDEFAULT_CDC_REPLICATION_INITIAL_WAIT() by using above convention. Not sure why. its part of nested ConfigBuilder class's companion not the TestDatabase

Copy link
Contributor

@stephane-airbyte stephane-airbyte Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about @JvmField? That should get rid of the getter, no?

fun getDefaultCdcReplicationInitialWait(): Duration {
return DEFAULT_CDC_REPLICATION_INITIAL_WAIT
}
}
}

companion object {
private val LOGGER: Logger? = LoggerFactory.getLogger(TestDatabase::class.java)

private val nextDatabaseId: AtomicInteger? = AtomicInteger(0)
private val nextDatabaseId: AtomicInteger = AtomicInteger(0)

private val nextContainerId: AtomicInteger? = AtomicInteger(0)
private val containerUidToId: MutableMap<String?, Int?>? = ConcurrentHashMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import java.io.*
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import org.apache.commons.lang3.RandomStringUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory

object PostgresSslConnectionUtils {
private val LOGGER: Logger = LoggerFactory.getLogger(PostgresSslConnectionUtils::class.java)
private const val CA_CERTIFICATE = "ca.crt"
private const val CLIENT_CERTIFICATE = "client.crt"
private const val CLIENT_KEY = "client.key"
Expand All @@ -34,6 +31,7 @@ object PostgresSslConnectionUtils {
const val ENCRYPT_FILE_NAME: String = "encrypt"
const val FACTORY_VALUE: String = "org.postgresql.ssl.DefaultJavaSSLFactory"

@JvmStatic
fun obtainConnectionOptions(encryption: JsonNode): Map<String, String> {
val additionalParameters: MutableMap<String, String> = HashMap()
if (!encryption.isNull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
return modifyDataSourceBuilder(builder).build()
}

protected fun modifyDataSourceBuilder(
protected open fun modifyDataSourceBuilder(
builder: DataSourceFactory.DataSourceBuilder
): DataSourceFactory.DataSourceBuilder {
return builder
Expand Down Expand Up @@ -232,7 +232,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
}

val defaultNamespace = config[configSchemaKey].asText()
addDefaultNamespaceToStreams(catalog!!, defaultNamespace)
addDefaultNamespaceToStreams(catalog, defaultNamespace)
return getV2MessageConsumer(
config,
catalog,
Expand All @@ -248,7 +248,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
outputRecordCollector: Consumer<AirbyteMessage>,
database: JdbcDatabase,
defaultNamespace: String
): SerializedAirbyteMessageConsumer? {
): SerializedAirbyteMessageConsumer {
val sqlGenerator = sqlGenerator
val rawNamespaceOverride = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
val parsedCatalog =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ abstract class JdbcSqlOperations : SqlOperations {
* For example, Postgres does not support index definitions within a CREATE TABLE statement, so
* we need to run CREATE INDEX statements after creating the table.
*/
protected fun postCreateTableQueries(schemaName: String?, tableName: String?): List<String> {
protected open fun postCreateTableQueries(
schemaName: String?,
tableName: String?
): List<String> {
return listOf()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import kotlin.Boolean
import kotlin.IllegalArgumentException
import kotlin.Int
import kotlin.String
import kotlin.UnsupportedOperationException
import kotlin.plus
import org.jooq.*
import org.jooq.conf.ParamType
Expand Down Expand Up @@ -71,7 +70,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
}

@VisibleForTesting
fun toDialectType(airbyteProtocolType: AirbyteProtocolType): DataType<*> {
open fun toDialectType(airbyteProtocolType: AirbyteProtocolType): DataType<*> {
return when (airbyteProtocolType) {
AirbyteProtocolType.STRING -> SQLDataType.VARCHAR(65535)
AirbyteProtocolType.NUMBER -> SQLDataType.DECIMAL(38, 9)
Expand Down Expand Up @@ -491,15 +490,6 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
)
}

private fun mergeTransaction(
streamConfig: StreamConfig,
finalSuffix: String,
minRawTimestamp: Optional<Instant>,
useExpensiveSaferCasting: Boolean
): String {
throw UnsupportedOperationException("Not implemented yet")
}

protected fun createSchemaSql(namespace: String?): String {
val dsl = dslContext
val createSchemaSql = dsl.createSchemaIfNotExists(DSL.quotedName(namespace))
Expand All @@ -523,7 +513,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
* statement. This is useful if the destination's CREATE TABLE statement does not accept an
* index definition.
*/
protected fun createIndexSql(stream: StreamConfig?, suffix: String?): List<String> {
protected open fun createIndexSql(stream: StreamConfig?, suffix: String?): List<String> {
return emptyList()
}

Expand Down Expand Up @@ -617,7 +607,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
}
}

protected fun castedField(
protected open fun castedField(
field: Field<*>?,
type: AirbyteProtocolType,
useExpensiveSaferCasting: Boolean
Expand Down
Loading