Skip to content

Commit 9413578

Browse files
fix kotlin warnings in db-sources CDK submodule (#37482)
fixing kotlin warnings in CDK db-sources submodule
1 parent 5d5b1e3 commit 9413578

File tree

41 files changed

+142
-174
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+142
-174
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ internal constructor(
176176
val config = parseConfig(parsed!!.getConfigPath())
177177
validateConfig(integration.spec()!!.connectionSpecification, config, "READ")
178178
val catalog =
179-
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)
179+
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!
180180
val stateOptional =
181181
parsed.getStatePath().map { path: Path? -> parseConfig(path) }
182182
try {
@@ -201,7 +201,7 @@ internal constructor(
201201
(integration as Destination).isV2Destination
202202
)
203203
val catalog =
204-
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)
204+
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!
205205

206206
try {
207207
destination!!
@@ -517,7 +517,7 @@ internal constructor(
517517
return Jsons.deserialize(IOs.readFile(path))
518518
}
519519

520-
private fun <T> parseConfig(path: Path?, klass: Class<T>): T {
520+
private fun <T> parseConfig(path: Path?, klass: Class<T>): T? {
521521
val jsonNode = parseConfig(path)
522522
return Jsons.`object`(jsonNode, klass)
523523
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -2354,7 +2354,7 @@ abstract class DestinationAcceptanceTest {
23542354
}
23552355

23562356
private fun <V0, V1> convertProtocolObject(v1: V1, klass: Class<V0>): V0 {
2357-
return Jsons.`object`(Jsons.jsonNode(v1), klass)
2357+
return Jsons.`object`(Jsons.jsonNode(v1), klass)!!
23582358
}
23592359
}
23602360
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/LocalAirbyteDestination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ class LocalAirbyteDestination(private val dest: Destination) : AirbyteDestinatio
3333
Jsons.`object`(
3434
Jsons.jsonNode(destinationConfig.catalog),
3535
ConfiguredAirbyteCatalog::class.java
36-
)
36+
)!!
3737
) { Destination::defaultOutputRecordCollector }
3838
consumer!!.start()
3939
}
4040

4141
@Throws(Exception::class)
4242
override fun accept(message: io.airbyte.protocol.models.AirbyteMessage) {
43-
consumer!!.accept(Jsons.`object`(Jsons.jsonNode(message), AirbyteMessage::class.java))
43+
consumer!!.accept(Jsons.`object`(Jsons.jsonNode(message), AirbyteMessage::class.java)!!)
4444
}
4545

4646
override fun notifyEndOfInput() {

airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle

-8
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@ plugins {
44
id "com.github.eirnym.js2p" version "1.0"
55
}
66

7-
java {
8-
// TODO: rewrite code to avoid javac wornings in the first place
9-
compileJava {
10-
options.compilerArgs += "-Xlint:-try,-rawtypes,-unchecked,-removal,-this-escape"
11-
}
12-
}
13-
14-
compileKotlin.compilerOptions.allWarningsAsErrors = false
157
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
168
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
179

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt

+4-6
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ class AirbyteDebeziumHandler<T>(
3333
private val targetPosition: CdcTargetPosition<T>,
3434
private val trackSchemaHistory: Boolean,
3535
private val firstRecordWaitTime: Duration,
36-
private val subsequentRecordWaitTime: Duration,
3736
private val queueSize: Int,
3837
private val addDbNameToOffsetState: Boolean
3938
) {
@@ -106,8 +105,7 @@ class AirbyteDebeziumHandler<T>(
106105
targetPosition,
107106
{ publisher.hasClosed() },
108107
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
109-
firstRecordWaitTime,
110-
subsequentRecordWaitTime
108+
firstRecordWaitTime
111109
)
112110

113111
val syncCheckpointDuration =
@@ -134,13 +132,13 @@ class AirbyteDebeziumHandler<T>(
134132
// not used
135133
// at all thus we will pass in null.
136134
val iterator: SourceStateIterator<ChangeEventWithMetadata> =
137-
SourceStateIterator<ChangeEventWithMetadata>(
135+
SourceStateIterator(
138136
eventIterator,
139137
null,
140-
messageProducer!!,
138+
messageProducer,
141139
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
142140
)
143-
return AutoCloseableIterators.fromIterator<AirbyteMessage>(iterator)
141+
return AutoCloseableIterators.fromIterator(iterator)
144142
}
145143

146144
companion object {

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class AirbyteFileOffsetBackingStore(
5454
}
5555

5656
fun persist(cdcState: JsonNode?) {
57+
@Suppress("unchecked_cast")
5758
val mapAsString: Map<String, String> =
5859
if (cdcState != null)
5960
Jsons.`object`(cdcState, MutableMap::class.java) as Map<String, String>
@@ -130,7 +131,7 @@ class AirbyteFileOffsetBackingStore(
130131

131132
if (obj !is HashMap<*, *>)
132133
throw ConnectException("Expected HashMap but found " + obj.javaClass)
133-
val raw = obj as Map<ByteArray?, ByteArray?>
134+
@Suppress("unchecked_cast") val raw = obj as Map<ByteArray?, ByteArray?>
134135
val data: MutableMap<ByteBuffer?, ByteBuffer?> = HashMap()
135136
for ((key1, value1) in raw) {
136137
val key = if ((key1 != null)) ByteBuffer.wrap(key1) else null

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteSchemaHistoryStorage.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ class AirbyteSchemaHistoryStorage(
141141
}
142142

143143
private fun persist(schemaHistory: SchemaHistory<Optional<JsonNode>>?) {
144-
if (schemaHistory!!.schema!!.isEmpty) {
144+
if (schemaHistory!!.schema.isEmpty) {
145145
return
146146
}
147-
val fileAsString = Jsons.`object`(schemaHistory.schema!!.get(), String::class.java)
147+
val fileAsString = Jsons.`object`(schemaHistory.schema.get(), String::class.java)
148148

149-
if (fileAsString == null || fileAsString.isEmpty()) {
149+
if (fileAsString.isNullOrEmpty()) {
150150
return
151151
}
152152

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ class DebeziumMessageProducer<T>(
9393
}
9494
}
9595

96-
if (checkpointOffsetToSend.size == 1 && !message!!.isSnapshotEvent) {
96+
if (checkpointOffsetToSend.size == 1 && !message.isSnapshotEvent) {
9797
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) {
9898
shouldEmitStateMessage = true
9999
}
100100
}
101101

102-
return eventConverter.toAirbyteMessage(message!!)
102+
return eventConverter.toAirbyteMessage(message)
103103
}
104104

105105
override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt

-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ class DebeziumRecordIterator<T>(
3636
private val publisherStatusSupplier: Supplier<Boolean>,
3737
private val debeziumShutdownProcedure: DebeziumShutdownProcedure<ChangeEvent<String?, String?>>,
3838
private val firstRecordWaitTime: Duration,
39-
subsequentRecordWaitTime: Duration?
4039
) : AbstractIterator<ChangeEventWithMetadata>(), AutoCloseableIterator<ChangeEventWithMetadata> {
4140
private val heartbeatEventSourceField: MutableMap<Class<out ChangeEvent<*, *>?>, Field?> =
4241
HashMap(1)

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumShutdownProcedure.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class DebeziumShutdownProcedure<T>(
3030
Executors.newSingleThreadExecutor { r: Runnable? ->
3131
val thread = Thread(r, "queue-data-transfer-thread")
3232
thread.uncaughtExceptionHandler =
33-
Thread.UncaughtExceptionHandler { t: Thread?, e: Throwable? -> exception = e }
33+
Thread.UncaughtExceptionHandler { _: Thread, e: Throwable -> exception = e }
3434
thread
3535
}
3636
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateUtil.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.debezium.internals
55

66
import io.debezium.config.Configuration
77
import io.debezium.embedded.KafkaConnectUtil
8-
import java.lang.Boolean
98
import java.util.*
109
import kotlin.String
1110
import org.apache.kafka.connect.json.JsonConverter
@@ -91,6 +90,6 @@ interface DebeziumStateUtil {
9190

9291
/** Configuration for offset state key/value converters. */
9392
val INTERNAL_CONVERTER_CONFIG: Map<String, String?> =
94-
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, Boolean.FALSE.toString())
93+
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString())
9594
}
9695
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumEventConverter.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class RelationalDbDebeziumEventConverter(
1515
) : DebeziumEventConverter {
1616
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
1717
val debeziumEvent = event.eventValueAsJson()
18-
val before: JsonNode = debeziumEvent!!.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
18+
val before: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
1919
val after: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
2020
val source: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)
2121

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,8 @@ abstract class AbstractJdbcSource<Datatype>(
622622
}
623623

624624
@Throws(SQLException::class)
625-
public override fun createDatabase(sourceConfig: JsonNode): JdbcDatabase {
626-
return createDatabase(sourceConfig, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER)
625+
public override fun createDatabase(config: JsonNode): JdbcDatabase {
626+
return createDatabase(config, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER)
627627
}
628628

629629
@Throws(SQLException::class)
@@ -634,7 +634,7 @@ abstract class AbstractJdbcSource<Datatype>(
634634
// Create the data source
635635
val dataSource =
636636
create(
637-
if (jdbcConfig!!.has(JdbcUtils.USERNAME_KEY))
637+
if (jdbcConfig.has(JdbcUtils.USERNAME_KEY))
638638
jdbcConfig[JdbcUtils.USERNAME_KEY].asText()
639639
else null,
640640
if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY))
@@ -643,7 +643,7 @@ abstract class AbstractJdbcSource<Datatype>(
643643
driverClassName,
644644
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(),
645645
connectionProperties,
646-
getConnectionTimeout(connectionProperties!!)
646+
getConnectionTimeout(connectionProperties)
647647
)
648648
// Record the data source so that it can be closed.
649649
dataSources.add(dataSource)

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/dto/JdbcPrivilegeDto.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ class JdbcPrivilegeDto(
4343
}
4444
}
4545

46-
override fun equals(o: Any?): Boolean {
47-
if (this === o) {
46+
override fun equals(other: Any?): Boolean {
47+
if (this === other) {
4848
return true
4949
}
50-
if (o == null || javaClass != o.javaClass) {
50+
if (other == null || javaClass != other.javaClass) {
5151
return false
5252
}
53-
val that = o as JdbcPrivilegeDto
53+
val that = other as JdbcPrivilegeDto
5454
return (Objects.equal(grantee, that.grantee) &&
5555
Objects.equal(tableName, that.tableName) &&
5656
Objects.equal(schemaName, that.schemaName) &&

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt

+4-17
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ protected constructor(driverClassName: String) :
217217
continue
218218
}
219219
val cursorType =
220-
table.fields!!
220+
table.fields
221221
.stream()
222222
.filter { info: CommonField<DataType> -> info.name == cursorField.get() }
223223
.map { obj: CommonField<DataType> -> obj.type }
@@ -300,7 +300,7 @@ protected constructor(driverClassName: String) :
300300
val systemNameSpaces = excludedInternalNameSpaces
301301
val systemViews = excludedViews
302302
val discoveredTables = discoverInternal(database)
303-
return (if (systemNameSpaces == null || systemNameSpaces.isEmpty()) discoveredTables
303+
return (if (systemNameSpaces.isEmpty()) discoveredTables
304304
else
305305
discoveredTables
306306
.stream()
@@ -425,7 +425,7 @@ protected constructor(driverClassName: String) :
425425
val cursorInfo = stateManager!!.getCursorInfo(pair)
426426

427427
val airbyteMessageIterator: AutoCloseableIterator<AirbyteMessage>
428-
if (cursorInfo!!.map { it.cursor }.isPresent) {
428+
if (cursorInfo.map { it.cursor }.isPresent) {
429429
airbyteMessageIterator =
430430
getIncrementalStream(
431431
database,
@@ -452,7 +452,7 @@ protected constructor(driverClassName: String) :
452452
)
453453
}
454454

455-
val cursorType = getCursorType(airbyteStream, cursorField)
455+
getCursorType(airbyteStream, cursorField)
456456

457457
val messageProducer =
458458
CursorStateMessageProducer(stateManager, cursorInfo.map { it.cursor })
@@ -662,13 +662,6 @@ protected constructor(driverClassName: String) :
662662
protected abstract fun getAirbyteType(columnType: DataType): JsonSchemaType
663663

664664
protected abstract val excludedInternalNameSpaces: Set<String>
665-
/**
666-
* Get list of system namespaces(schemas) in order to exclude them from the `discover`
667-
* result list.
668-
*
669-
* @return set of system namespaces(schemas) to be excluded
670-
*/
671-
get
672665

673666
protected open val excludedViews: Set<String>
674667
/**
@@ -722,12 +715,6 @@ protected constructor(driverClassName: String) :
722715
): Map<String, MutableList<String>>
723716

724717
protected abstract val quoteString: String?
725-
/**
726-
* Returns quote symbol of the database
727-
*
728-
* @return quote symbol
729-
*/
730-
get
731718

732719
/**
733720
* Read all data from a table.

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/CursorInfo.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ class CursorInfo(
3030
return this
3131
}
3232

33-
override fun equals(o: Any?): Boolean {
34-
if (this === o) {
33+
override fun equals(other: Any?): Boolean {
34+
if (this === other) {
3535
return true
3636
}
37-
if (o == null || javaClass != o.javaClass) {
37+
if (other == null || javaClass != other.javaClass) {
3838
return false
3939
}
40-
val that = o as CursorInfo
40+
val that = other as CursorInfo
4141
return originalCursorField == that.originalCursorField &&
4242
originalCursor == that.originalCursor &&
4343
originalCursorRecordCount == that.originalCursorRecordCount &&

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ object DbSourceDiscoverUtil {
138138
tableInfo.fields
139139
)
140140
.withSupportedSyncModes(
141-
if (tableInfo.cursorFields != null && tableInfo.cursorFields.isEmpty())
141+
if (tableInfo.cursorFields.isEmpty())
142142
Lists.newArrayList(SyncMode.FULL_REFRESH)
143143
else Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
144144
)

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,7 @@ object RelationalDbQueryUtils {
104104
fun prettyPrintConfiguredAirbyteStreamList(streamList: List<ConfiguredAirbyteStream>): String {
105105
return streamList
106106
.stream()
107-
.map { s: ConfiguredAirbyteStream ->
108-
"%s.%s".formatted(s.stream.namespace, s.stream.name)
109-
}
107+
.map { s: ConfiguredAirbyteStream -> "${s.stream.namespace}.${s.stream.name}" }
110108
.collect(Collectors.joining(", "))
111109
}
112110

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ class StateDecoratingIterator(
236236
}
237237

238238
companion object {
239-
private val LOGGER: Logger = LoggerFactory.getLogger(StateDecoratingIterator::class.java)
239+
private val LOGGER: Logger =
240+
LoggerFactory.getLogger(@Suppress("deprecation") StateDecoratingIterator::class.java)
240241
}
241242
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ class GlobalStateManager(
9494
if (airbyteStateMessage!!.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) {
9595
return Jsons.`object`(airbyteStateMessage.global.sharedState, CdcState::class.java)
9696
} else {
97-
val legacyState = Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
97+
val legacyState: DbState? =
98+
Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
9899
return legacyState?.cdcState
99100
}
100101
}
@@ -114,7 +115,8 @@ class GlobalStateManager(
114115
}
115116
.collect(Collectors.toSet())
116117
} else {
117-
val legacyState = Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
118+
val legacyState: DbState? =
119+
Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
118120
return if (legacyState != null)
119121
extractNamespacePairsFromDbStreamState(legacyState.streams)
120122
else emptySet<AirbyteStreamNameNamespacePair>()
@@ -157,7 +159,7 @@ class GlobalStateManager(
157159
return@Supplier Jsons.`object`<DbState>(
158160
airbyteStateMessage.data,
159161
DbState::class.java
160-
)
162+
)!!
161163
.streams
162164
.stream()
163165
.map<AirbyteStreamState?> { s: DbStreamState ->

0 commit comments

Comments
 (0)