Skip to content

fix kotlin warnings in db-sources CDK submodule #37482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ internal constructor(
val config = parseConfig(parsed!!.getConfigPath())
validateConfig(integration.spec()!!.connectionSpecification, config, "READ")
val catalog =
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!
val stateOptional =
parsed.getStatePath().map { path: Path? -> parseConfig(path) }
try {
Expand All @@ -201,7 +201,7 @@ internal constructor(
(integration as Destination).isV2Destination
)
val catalog =
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!

try {
destination!!
Expand Down Expand Up @@ -517,7 +517,7 @@ internal constructor(
return Jsons.deserialize(IOs.readFile(path))
}

private fun <T> parseConfig(path: Path?, klass: Class<T>): T {
private fun <T> parseConfig(path: Path?, klass: Class<T>): T? {
val jsonNode = parseConfig(path)
return Jsons.`object`(jsonNode, klass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2354,7 +2354,7 @@ abstract class DestinationAcceptanceTest {
}

private fun <V0, V1> convertProtocolObject(v1: V1, klass: Class<V0>): V0 {
return Jsons.`object`(Jsons.jsonNode(v1), klass)
return Jsons.`object`(Jsons.jsonNode(v1), klass)!!
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class LocalAirbyteDestination(private val dest: Destination) : AirbyteDestinatio
Jsons.`object`(
Jsons.jsonNode(destinationConfig.catalog),
ConfiguredAirbyteCatalog::class.java
)
)!!
) { Destination::defaultOutputRecordCollector }
consumer!!.start()
}

@Throws(Exception::class)
override fun accept(message: io.airbyte.protocol.models.AirbyteMessage) {
consumer!!.accept(Jsons.`object`(Jsons.jsonNode(message), AirbyteMessage::class.java))
consumer!!.accept(Jsons.`object`(Jsons.jsonNode(message), AirbyteMessage::class.java)!!)
}

override fun notifyEndOfInput() {
Expand Down
8 changes: 0 additions & 8 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@ plugins {
id "com.github.eirnym.js2p" version "1.0"
}

java {
// TODO: rewrite code to avoid javac wornings in the first place
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes,-unchecked,-removal,-this-escape"
}
}

compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class AirbyteDebeziumHandler<T>(
private val targetPosition: CdcTargetPosition<T>,
private val trackSchemaHistory: Boolean,
private val firstRecordWaitTime: Duration,
private val subsequentRecordWaitTime: Duration,
private val queueSize: Int,
private val addDbNameToOffsetState: Boolean
) {
Expand Down Expand Up @@ -106,8 +105,7 @@ class AirbyteDebeziumHandler<T>(
targetPosition,
{ publisher.hasClosed() },
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
firstRecordWaitTime,
subsequentRecordWaitTime
firstRecordWaitTime
)

val syncCheckpointDuration =
Expand All @@ -134,13 +132,13 @@ class AirbyteDebeziumHandler<T>(
// not used
// at all thus we will pass in null.
val iterator: SourceStateIterator<ChangeEventWithMetadata> =
SourceStateIterator<ChangeEventWithMetadata>(
SourceStateIterator(
eventIterator,
null,
messageProducer!!,
messageProducer,
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
)
return AutoCloseableIterators.fromIterator<AirbyteMessage>(iterator)
return AutoCloseableIterators.fromIterator(iterator)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AirbyteFileOffsetBackingStore(
}

fun persist(cdcState: JsonNode?) {
@Suppress("unchecked_cast")
val mapAsString: Map<String, String> =
if (cdcState != null)
Jsons.`object`(cdcState, MutableMap::class.java) as Map<String, String>
Expand Down Expand Up @@ -130,7 +131,7 @@ class AirbyteFileOffsetBackingStore(

if (obj !is HashMap<*, *>)
throw ConnectException("Expected HashMap but found " + obj.javaClass)
val raw = obj as Map<ByteArray?, ByteArray?>
@Suppress("unchecked_cast") val raw = obj as Map<ByteArray?, ByteArray?>
val data: MutableMap<ByteBuffer?, ByteBuffer?> = HashMap()
for ((key1, value1) in raw) {
val key = if ((key1 != null)) ByteBuffer.wrap(key1) else null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ class AirbyteSchemaHistoryStorage(
}

private fun persist(schemaHistory: SchemaHistory<Optional<JsonNode>>?) {
if (schemaHistory!!.schema!!.isEmpty) {
if (schemaHistory!!.schema.isEmpty) {
return
}
val fileAsString = Jsons.`object`(schemaHistory.schema!!.get(), String::class.java)
val fileAsString = Jsons.`object`(schemaHistory.schema.get(), String::class.java)

if (fileAsString == null || fileAsString.isEmpty()) {
if (fileAsString.isNullOrEmpty()) {
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ class DebeziumMessageProducer<T>(
}
}

if (checkpointOffsetToSend.size == 1 && !message!!.isSnapshotEvent) {
if (checkpointOffsetToSend.size == 1 && !message.isSnapshotEvent) {
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) {
shouldEmitStateMessage = true
}
}

return eventConverter.toAirbyteMessage(message!!)
return eventConverter.toAirbyteMessage(message)
}

override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class DebeziumRecordIterator<T>(
private val publisherStatusSupplier: Supplier<Boolean>,
private val debeziumShutdownProcedure: DebeziumShutdownProcedure<ChangeEvent<String?, String?>>,
private val firstRecordWaitTime: Duration,
subsequentRecordWaitTime: Duration?
) : AbstractIterator<ChangeEventWithMetadata>(), AutoCloseableIterator<ChangeEventWithMetadata> {
private val heartbeatEventSourceField: MutableMap<Class<out ChangeEvent<*, *>?>, Field?> =
HashMap(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DebeziumShutdownProcedure<T>(
Executors.newSingleThreadExecutor { r: Runnable? ->
val thread = Thread(r, "queue-data-transfer-thread")
thread.uncaughtExceptionHandler =
Thread.UncaughtExceptionHandler { t: Thread?, e: Throwable? -> exception = e }
Thread.UncaughtExceptionHandler { _: Thread, e: Throwable -> exception = e }
thread
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.debezium.internals

import io.debezium.config.Configuration
import io.debezium.embedded.KafkaConnectUtil
import java.lang.Boolean
import java.util.*
import kotlin.String
import org.apache.kafka.connect.json.JsonConverter
Expand Down Expand Up @@ -91,6 +90,6 @@ interface DebeziumStateUtil {

/** Configuration for offset state key/value converters. */
val INTERNAL_CONVERTER_CONFIG: Map<String, String?> =
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, Boolean.FALSE.toString())
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class RelationalDbDebeziumEventConverter(
) : DebeziumEventConverter {
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
val debeziumEvent = event.eventValueAsJson()
val before: JsonNode = debeziumEvent!!.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val before: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ abstract class AbstractJdbcSource<Datatype>(
}

@Throws(SQLException::class)
public override fun createDatabase(sourceConfig: JsonNode): JdbcDatabase {
return createDatabase(sourceConfig, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER)
public override fun createDatabase(config: JsonNode): JdbcDatabase {
return createDatabase(config, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER)
}

@Throws(SQLException::class)
Expand All @@ -634,7 +634,7 @@ abstract class AbstractJdbcSource<Datatype>(
// Create the data source
val dataSource =
create(
if (jdbcConfig!!.has(JdbcUtils.USERNAME_KEY))
if (jdbcConfig.has(JdbcUtils.USERNAME_KEY))
jdbcConfig[JdbcUtils.USERNAME_KEY].asText()
else null,
if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY))
Expand All @@ -643,7 +643,7 @@ abstract class AbstractJdbcSource<Datatype>(
driverClassName,
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(),
connectionProperties,
getConnectionTimeout(connectionProperties!!)
getConnectionTimeout(connectionProperties)
)
// Record the data source so that it can be closed.
dataSources.add(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ class JdbcPrivilegeDto(
}
}

override fun equals(o: Any?): Boolean {
if (this === o) {
override fun equals(other: Any?): Boolean {
if (this === other) {
return true
}
if (o == null || javaClass != o.javaClass) {
if (other == null || javaClass != other.javaClass) {
return false
}
val that = o as JdbcPrivilegeDto
val that = other as JdbcPrivilegeDto
return (Objects.equal(grantee, that.grantee) &&
Objects.equal(tableName, that.tableName) &&
Objects.equal(schemaName, that.schemaName) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ protected constructor(driverClassName: String) :
continue
}
val cursorType =
table.fields!!
table.fields
.stream()
.filter { info: CommonField<DataType> -> info.name == cursorField.get() }
.map { obj: CommonField<DataType> -> obj.type }
Expand Down Expand Up @@ -300,7 +300,7 @@ protected constructor(driverClassName: String) :
val systemNameSpaces = excludedInternalNameSpaces
val systemViews = excludedViews
val discoveredTables = discoverInternal(database)
return (if (systemNameSpaces == null || systemNameSpaces.isEmpty()) discoveredTables
return (if (systemNameSpaces.isEmpty()) discoveredTables
else
discoveredTables
.stream()
Expand Down Expand Up @@ -425,7 +425,7 @@ protected constructor(driverClassName: String) :
val cursorInfo = stateManager!!.getCursorInfo(pair)

val airbyteMessageIterator: AutoCloseableIterator<AirbyteMessage>
if (cursorInfo!!.map { it.cursor }.isPresent) {
if (cursorInfo.map { it.cursor }.isPresent) {
airbyteMessageIterator =
getIncrementalStream(
database,
Expand All @@ -452,7 +452,7 @@ protected constructor(driverClassName: String) :
)
}

val cursorType = getCursorType(airbyteStream, cursorField)
getCursorType(airbyteStream, cursorField)

val messageProducer =
CursorStateMessageProducer(stateManager, cursorInfo.map { it.cursor })
Expand Down Expand Up @@ -662,13 +662,6 @@ protected constructor(driverClassName: String) :
protected abstract fun getAirbyteType(columnType: DataType): JsonSchemaType

protected abstract val excludedInternalNameSpaces: Set<String>
/**
* Get list of system namespaces(schemas) in order to exclude them from the `discover`
* result list.
*
* @return set of system namespaces(schemas) to be excluded
*/
get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we might instead want preserve the comment as a comment on the val itself.

On the other hand, this field is fairly self-explanatory. Up to you.


protected open val excludedViews: Set<String>
/**
Expand Down Expand Up @@ -722,12 +715,6 @@ protected constructor(driverClassName: String) :
): Map<String, MutableList<String>>

protected abstract val quoteString: String?
/**
* Returns quote symbol of the database
*
* @return quote symbol
*/
get

/**
* Read all data from a table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ class CursorInfo(
return this
}

override fun equals(o: Any?): Boolean {
if (this === o) {
override fun equals(other: Any?): Boolean {
if (this === other) {
return true
}
if (o == null || javaClass != o.javaClass) {
if (other == null || javaClass != other.javaClass) {
return false
}
val that = o as CursorInfo
val that = other as CursorInfo
return originalCursorField == that.originalCursorField &&
originalCursor == that.originalCursor &&
originalCursorRecordCount == that.originalCursorRecordCount &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object DbSourceDiscoverUtil {
tableInfo.fields
)
.withSupportedSyncModes(
if (tableInfo.cursorFields != null && tableInfo.cursorFields.isEmpty())
if (tableInfo.cursorFields.isEmpty())
Lists.newArrayList(SyncMode.FULL_REFRESH)
else Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ object RelationalDbQueryUtils {
fun prettyPrintConfiguredAirbyteStreamList(streamList: List<ConfiguredAirbyteStream>): String {
return streamList
.stream()
.map { s: ConfiguredAirbyteStream ->
"%s.%s".formatted(s.stream.namespace, s.stream.name)
}
.map { s: ConfiguredAirbyteStream -> "${s.stream.namespace}.${s.stream.name}" }
.collect(Collectors.joining(", "))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class StateDecoratingIterator(
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(StateDecoratingIterator::class.java)
private val LOGGER: Logger =
LoggerFactory.getLogger(@Suppress("deprecation") StateDecoratingIterator::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class GlobalStateManager(
if (airbyteStateMessage!!.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) {
return Jsons.`object`(airbyteStateMessage.global.sharedState, CdcState::class.java)
} else {
val legacyState = Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
val legacyState: DbState? =
Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
return legacyState?.cdcState
}
}
Expand All @@ -114,7 +115,8 @@ class GlobalStateManager(
}
.collect(Collectors.toSet())
} else {
val legacyState = Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
val legacyState: DbState? =
Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
return if (legacyState != null)
extractNamespacePairsFromDbStreamState(legacyState.streams)
else emptySet<AirbyteStreamNameNamespacePair>()
Expand Down Expand Up @@ -157,7 +159,7 @@ class GlobalStateManager(
return@Supplier Jsons.`object`<DbState>(
airbyteStateMessage.data,
DbState::class.java
)
)!!
.streams
.stream()
.map<AirbyteStreamState?> { s: DbStreamState ->
Expand Down
Loading
Loading