Skip to content

Adapt source-mssql to latest Kotlin converted CDK #36772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.29.1 | 2024-04-03 | [\#36772](https://github.com/airbytehq/airbyte/pull/36772) | Changes to make source-mssql compileable |
| 0.29.0 | 2024-04-02 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Build artifact publication changes and fixes. |
| 0.28.21 | 2024-04-02 | [\#36673](https://github.com/airbytehq/airbyte/pull/36673) | Change the destination message parsing to use standard java/kotlin classes. Adds logging to catch empty lines. |
| 0.28.20 | 2024-04-01 | [\#36584](https://github.com/airbytehq/airbyte/pull/36584) | Changes to make source-postgres compileable |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object DataTypeUtils {
return dateFormat.format(date)
}

@JvmStatic
fun toISOTimeString(dateTime: LocalDateTime): String {
return DateTimeFormatter.ISO_TIME.format(dateTime.toLocalTime())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putBinary(
protected open fun putBinary(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
* close the returned stream to release the database connection. Otherwise, there will be a
* connection leak.
*
* @param statementCreator create a [PreparedStatement] from a [Connection].
* @paramstatementCreator create a [PreparedStatement] from a [Connection].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: revert this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatter did this 🤷‍♂️
I don't quite know why

* @param recordTransform transform each record of that result set into the desired type. do NOT
* just pass the [ResultSet] through. it is a stateful object will not be accessible if returned
* from recordTransform.
Expand Down Expand Up @@ -195,10 +195,10 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
}

@Throws(SQLException::class)
fun queryMetadata(sql: String, vararg params: String): ResultSetMetaData {
fun queryMetadata(sql: String, vararg params: String): ResultSetMetaData? {
unsafeQuery(
{ c: Connection -> getPreparedStatement(sql, params, c) },
{ obj: ResultSet -> obj.metaData }
{ obj: ResultSet -> obj.metaData },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this extra comma?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is something our formatter does

)
.use { q ->
return q.findFirst().orElse(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ open class JdbcSourceOperations :
}

@Throws(SQLException::class)
protected fun setTimestampWithTimezone(
protected open fun setTimestampWithTimezone(
preparedStatement: PreparedStatement,
parameterIndex: Int,
value: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
val POSTGRES_CONNECT_TIMEOUT_DEFAULT_DURATION: Duration = Duration.ofSeconds(10)

const val CONNECT_TIMEOUT_KEY: String = "connectTimeout"
val CONNECT_TIMEOUT_DEFAULT: Duration = Duration.ofSeconds(60)
@JvmField val CONNECT_TIMEOUT_DEFAULT: Duration = Duration.ofSeconds(60)

/**
* Retrieves connectionTimeout value from connection properties in millis, default minimum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.*

object SshHelpers {
@get:Throws(IOException::class)
@JvmStatic
val specAndInjectSsh: ConnectorSpecification?
get() = getSpecAndInjectSsh(Optional.empty())

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.0
version=0.29.1
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ protected constructor(val container: C) : AutoCloseable {

@Volatile private lateinit var dslContext: DSLContext

protected val databaseId: Int = nextDatabaseId.getAndIncrement()
@JvmField protected val databaseId: Int = nextDatabaseId.getAndIncrement()
@JvmField
protected val containerId: Int =
containerUidToId!!.computeIfAbsent(container.containerId) { k: String? ->
containerUidToId!!.computeIfAbsent(container.containerId) { _: String? ->
nextContainerId!!.getAndIncrement()
}!!
private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
Expand All @@ -68,6 +69,7 @@ protected constructor(val container: C) : AutoCloseable {
return retVal
}

@Suppress("UNCHECKED_CAST")
protected fun self(): T {
return this as T
}
Expand All @@ -77,19 +79,19 @@ protected constructor(val container: C) : AutoCloseable {
if (this.isInitialized) {
throw RuntimeException("TestDatabase instance is already initialized")
}
connectionProperties!![key] = value
connectionProperties[key] = value
return self()
}

/** Enqueues a SQL statement to be executed when this object is closed. */
fun onClose(fmtSql: String, vararg fmtArgs: Any?): T {
cleanupSQL!!.add(String.format(fmtSql!!, *fmtArgs))
cleanupSQL.add(String.format(fmtSql, *fmtArgs))
return self()
}

/** Executes a SQL statement after calling String.format on the arguments. */
fun with(fmtSql: String, vararg fmtArgs: Any?): T {
execSQL(Stream.of(String.format(fmtSql!!, *fmtArgs)))
execSQL(Stream.of(String.format(fmtSql, *fmtArgs)))
return self()
}

Expand All @@ -98,8 +100,8 @@ protected constructor(val container: C) : AutoCloseable {
* object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes
* the [DataSource] and [DSLContext] owned by this object.
*/
fun initialized(): T? {
inContainerBootstrapCmd()!!.forEach { cmds: Stream<String> -> this.execInContainer(cmds) }
open fun initialized(): T? {
inContainerBootstrapCmd().forEach { cmds: Stream<String> -> this.execInContainer(cmds) }
this.dataSource =
DataSourceFactory.create(
userName,
Expand All @@ -117,7 +119,7 @@ protected constructor(val container: C) : AutoCloseable {
}

val isInitialized: Boolean
get() = dslContext != null
get() = ::dslContext.isInitialized

protected abstract fun inContainerBootstrapCmd(): Stream<Stream<String>>

Expand All @@ -137,7 +139,7 @@ protected constructor(val container: C) : AutoCloseable {
val userName: String
get() = withNamespace("user")

val password: String
open val password: String?
get() = "password"

fun getDataSource(): DataSource? {
Expand All @@ -154,11 +156,11 @@ protected constructor(val container: C) : AutoCloseable {
return dslContext
}

val jdbcUrl: String?
open val jdbcUrl: String?
get() =
String.format(
databaseDriver!!.urlFormatString,
container!!.host,
container.host,
container.firstMappedPort,
databaseName
)
Expand All @@ -169,7 +171,7 @@ protected constructor(val container: C) : AutoCloseable {
protected fun execSQL(sql: Stream<String>) {
try {
database!!.query<Any?> { ctx: DSLContext? ->
sql!!.forEach { statement: String? ->
sql.forEach { statement: String? ->
LOGGER!!.info("executing SQL statement {}", statement)
ctx!!.execute(statement)
}
Expand All @@ -194,7 +196,7 @@ protected constructor(val container: C) : AutoCloseable {
)
)
)
val exec = container!!.execInContainer(*cmd.toTypedArray<String?>())
val exec = container.execInContainer(*cmd.toTypedArray<String?>())
if (exec!!.exitCode == 0) {
LOGGER.info(
formatLogLine(
Expand Down Expand Up @@ -248,7 +250,7 @@ protected constructor(val container: C) : AutoCloseable {
}

override fun close() {
execSQL(cleanupSQL!!.stream())
execSQL(cleanupSQL.stream())
execInContainer(inContainerUndoBootstrapCmd())
LOGGER!!.info("closing database databaseId=$databaseId")
}
Expand All @@ -259,46 +261,47 @@ protected constructor(val container: C) : AutoCloseable {
protected val builder: ImmutableMap.Builder<Any, Any> = ImmutableMap.builder()

fun build(): JsonNode {
return Jsons.jsonNode(builder!!.build())
return Jsons.jsonNode(builder.build())
}

@Suppress("UNCHECKED_CAST")
fun self(): B {
return this as B
}

fun with(key: Any, value: Any): B {
builder!!.put(key, value)
builder.put(key, value)
return self()
}

fun withDatabase(): B {
return this.with(JdbcUtils.DATABASE_KEY, testDatabase!!.databaseName)
return this.with(JdbcUtils.DATABASE_KEY, testDatabase.databaseName)
}

fun withCredentials(): B {
return this.with(JdbcUtils.USERNAME_KEY, testDatabase!!.userName)
.with(JdbcUtils.PASSWORD_KEY, testDatabase.password)
return this.with(JdbcUtils.USERNAME_KEY, testDatabase.userName)
.with(JdbcUtils.PASSWORD_KEY, testDatabase.password!!)
}

fun withResolvedHostAndPort(): B {
return this.with(
JdbcUtils.HOST_KEY,
HostPortResolver.resolveHost(testDatabase!!.container)
HostPortResolver.resolveHost(testDatabase.container)
)
.with(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(testDatabase.container))
}

fun withHostAndPort(): B {
return this.with(JdbcUtils.HOST_KEY, testDatabase!!.container!!.host)
.with(JdbcUtils.PORT_KEY, testDatabase.container!!.firstMappedPort)
return this.with(JdbcUtils.HOST_KEY, testDatabase.container.host)
.with(JdbcUtils.PORT_KEY, testDatabase.container.firstMappedPort)
}

fun withoutSsl(): B {
open fun withoutSsl(): B {
return with(JdbcUtils.SSL_KEY, false)
}

fun withSsl(sslMode: MutableMap<Any, Any>): B {
return with(JdbcUtils.SSL_KEY, true)!!.with(JdbcUtils.SSL_MODE_KEY, sslMode)
open fun withSsl(sslMode: MutableMap<Any?, Any?>): B {
return with(JdbcUtils.SSL_KEY, true).with(JdbcUtils.SSL_MODE_KEY, sslMode)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class AirbyteFileOffsetBackingStore(
return offsetManager
}

@JvmStatic
fun initializeDummyStateForSnapshotPurpose(): AirbyteFileOffsetBackingStore {
val cdcWorkingDir: Path
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,7 @@ class AirbyteSchemaHistoryStorage(
private val reader: DocumentReader = DocumentReader.defaultReader()
private val writer: DocumentWriter = DocumentWriter.defaultWriter()

class SchemaHistory<T>(schema: T, isCompressed: Boolean) {
val schema: T
val isCompressed: Boolean

init {
this.schema = schema
this.isCompressed = isCompressed
}
}
data class SchemaHistory<T>(val schema: T, val isCompressed: Boolean)

fun read(): SchemaHistory<String> {
val fileSizeMB = path.toFile().length().toDouble() / (ONE_MB)
Expand Down Expand Up @@ -240,6 +232,7 @@ class AirbyteSchemaHistoryStorage(
return string.toByteArray(StandardCharsets.UTF_8).size.toDouble() / (ONE_MB)
}

@JvmStatic
fun initializeDBHistory(
schemaHistory: SchemaHistory<Optional<JsonNode>>?,
compressSchemaHistoryForState: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object RecordWaitTimeUtil {
}
}

@JvmStatic
fun getFirstRecordWaitTime(config: JsonNode): Duration {
val isTest = config.has("is_test") && config["is_test"].asBoolean()
var firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME
Expand All @@ -67,6 +68,7 @@ object RecordWaitTimeUtil {
return firstRecordWaitTime
}

@JvmStatic
fun getSubsequentRecordWaitTime(config: JsonNode): Duration {
var subsequentRecordWaitTime = DEFAULT_SUBSEQUENT_RECORD_WAIT_TIME
val isTest = config.has("is_test") && config["is_test"].asBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ abstract class AbstractJdbcSource<Datatype>(

/** Some databases need special column names in the query. */
@Throws(SQLException::class)
protected fun getWrappedColumnNames(
protected open fun getWrappedColumnNames(
database: JdbcDatabase?,
connection: Connection?,
columnNames: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory
object RelationalDbQueryUtils {
private val LOGGER: Logger = LoggerFactory.getLogger(RelationalDbQueryUtils::class.java)

@JvmStatic
fun getIdentifierWithQuoting(identifier: String, quoteString: String): String {
// double-quoted values within a database name or column name should be wrapped with extra
// quoteString
Expand Down Expand Up @@ -58,10 +59,12 @@ object RelationalDbQueryUtils {
}

/** @return the input identifier with quotes. */
@JvmStatic
fun enquoteIdentifier(identifier: String?, quoteString: String?): String {
return quoteString + identifier + quoteString
}

@JvmStatic
fun <Database : SqlDatabase?> queryTable(
database: Database,
sqlQuery: String?,
Expand All @@ -87,6 +90,7 @@ object RelationalDbQueryUtils {
)
}

@JvmStatic
fun logStreamSyncStatus(streams: List<ConfiguredAirbyteStream>, syncType: String?) {
if (streams.isEmpty()) {
LOGGER.info("No Streams will be synced via {}.", syncType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object RelationalDbReadUtil {
.collect(Collectors.toList())
}

@JvmStatic
fun identifyStreamsForCursorBased(
catalog: ConfiguredAirbyteCatalog,
streamsForInitialLoad: List<ConfiguredAirbyteStream>
Expand All @@ -54,6 +55,7 @@ object RelationalDbReadUtil {
.collect(Collectors.toList())
}

@JvmStatic
fun convertNameNamespacePairFromV0(
v1NameNamespacePair: io.airbyte.protocol.models.AirbyteStreamNameNamespacePair
): AirbyteStreamNameNamespacePair {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ object StateGeneratorUtils {
* @param airbyteStateMessage A [AirbyteStateType.LEGACY] state message.
* @return A [AirbyteStateType.GLOBAL] state message.
*/
@JvmStatic
fun convertLegacyStateToGlobalState(
airbyteStateMessage: AirbyteStateMessage
): AirbyteStateMessage {
Expand Down
Loading
Loading