Skip to content

Commit f6db47e

Browse files
fix compiler warnings
1 parent abe39af commit f6db47e

File tree

58 files changed

+367
-380
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+367
-380
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DSLContextFactory.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ object DSLContextFactory {
6767
driverClassName: String,
6868
jdbcConnectionString: String?,
6969
dialect: SQLDialect?,
70-
connectionProperties: Map<String?, String?>?,
70+
connectionProperties: Map<String, String>?,
7171
connectionTimeout: Duration?
7272
): DSLContext {
7373
return DSL.using(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DataSourceFactory.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ object DataSourceFactory {
5050
password: String?,
5151
driverClassName: String,
5252
jdbcConnectionString: String?,
53-
connectionProperties: Map<String?, String?>?,
53+
connectionProperties: Map<String, String>?,
5454
connectionTimeout: Duration?
5555
): DataSource {
5656
return DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
@@ -100,7 +100,7 @@ object DataSourceFactory {
100100
port: Int,
101101
database: String?,
102102
driverClassName: String,
103-
connectionProperties: Map<String?, String?>?
103+
connectionProperties: Map<String, String>?
104104
): DataSource {
105105
return DataSourceBuilder(username, password, driverClassName, host, port, database)
106106
.withConnectionProperties(connectionProperties)
@@ -152,7 +152,7 @@ object DataSourceFactory {
152152
private var password: String?,
153153
private var driverClassName: String
154154
) {
155-
private var connectionProperties: Map<String?, String?> = java.util.Map.of()
155+
private var connectionProperties: Map<String, String> = java.util.Map.of()
156156
private var database: String? = null
157157
private var host: String? = null
158158
private var jdbcUrl: String? = null
@@ -185,7 +185,7 @@ object DataSourceFactory {
185185
}
186186

187187
fun withConnectionProperties(
188-
connectionProperties: Map<String?, String?>?
188+
connectionProperties: Map<String, String>?
189189
): DataSourceBuilder {
190190
if (connectionProperties != null) {
191191
this.connectionProperties = connectionProperties

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory
1616

1717
/** Implementation of source operations with standard JDBC types. */
1818
class JdbcSourceOperations :
19-
AbstractJdbcCompatibleSourceOperations<JDBCType?>(), SourceOperations<ResultSet, JDBCType?> {
19+
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
2020
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
2121
return try {
2222
JDBCType.valueOf(columnTypeInt)
@@ -147,7 +147,7 @@ class JdbcSourceOperations :
147147
return JdbcUtils.ALLOWED_CURSOR_TYPES.contains(type)
148148
}
149149

150-
override fun getAirbyteType(jdbcType: JDBCType?): JsonSchemaType {
150+
override fun getAirbyteType(jdbcType: JDBCType): JsonSchemaType {
151151
return when (jdbcType) {
152152
JDBCType.BIT,
153153
JDBCType.BOOLEAN -> JsonSchemaType.BOOLEAN

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import java.util.*
1111

1212
abstract class JdbcConnector
1313
protected constructor(@JvmField protected val driverClassName: String) : BaseConnector() {
14-
protected fun getConnectionTimeout(connectionProperties: Map<String?, String?>): Duration {
14+
protected fun getConnectionTimeout(connectionProperties: Map<String, String>): Duration {
1515
return getConnectionTimeout(connectionProperties, driverClassName)
1616
}
1717

@@ -37,7 +37,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
3737
* @return DataSourceBuilder class used to create dynamic fields for DataSource
3838
*/
3939
fun getConnectionTimeout(
40-
connectionProperties: Map<String?, String?>,
40+
connectionProperties: Map<String, String>,
4141
driverClassName: String?
4242
): Duration {
4343
val parsedConnectionTimeout =

airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ java {
1111
}
1212
}
1313

14+
compileKotlin.compilerOptions.allWarningsAsErrors = false
15+
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
16+
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
17+
18+
1419
// Convert yaml to java: relationaldb.models
1520
jsonSchema2Pojo {
1621
sourceType = SourceType.YAMLSCHEMA

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
5555
reportQueueUtilization()
5656
return super.poll()
5757
}
58-
59-
companion object {
60-
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)
61-
}
6258
}
6359

6460
fun getIncrementalIterators(debeziumPropertiesManager: DebeziumPropertiesManager,
@@ -70,12 +66,12 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
7066
val offsetManager: AirbyteFileOffsetBackingStore = AirbyteFileOffsetBackingStore.Companion.initializeState(
7167
cdcSavedInfoFetcher.savedOffset,
7268
if (addDbNameToOffsetState) Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText()) else Optional.empty<String>())
73-
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?> = if (trackSchemaHistory
69+
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage> = if (trackSchemaHistory
7470
) Optional.of<AirbyteSchemaHistoryStorage?>(AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
7571
cdcSavedInfoFetcher.savedSchemaHistory, cdcStateHandler.compressSchemaHistoryForState()))
7672
else Optional.empty<AirbyteSchemaHistoryStorage>()
7773
val publisher = DebeziumRecordPublisher(debeziumPropertiesManager)
78-
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> = CapacityReportingBlockingQueue<ChangeEvent<String, String>>(queueSize)
74+
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> = CapacityReportingBlockingQueue(queueSize)
7975
publisher.start(queue, offsetManager, schemaHistoryManager)
8076
// handle state machine around pub/sub logic.
8177
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> = DebeziumRecordIterator(
@@ -106,6 +102,7 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
106102

107103
companion object {
108104
private val LOGGER: Logger = LoggerFactory.getLogger(AirbyteDebeziumHandler::class.java)
105+
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)
109106

110107
/**
111108
* We use 10000 as capacity cause the default queue size and batch size of debezium is :

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcSavedInfoFetcher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ import java.util.*
1414
interface CdcSavedInfoFetcher {
1515
val savedOffset: JsonNode?
1616

17-
val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode?>?>?
17+
val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode>>?
1818
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
1111
* which suits them. Also, it adds some utils to verify CDC event status.
1212
*/
1313
interface CdcStateHandler {
14-
fun saveState(offset: Map<String?, String?>?, dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory<String?>?): AirbyteMessage?
14+
fun saveState(offset: Map<String?, String?>?, dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory<String>?): AirbyteMessage?
1515

1616
fun saveStateAfterCompletionOfSnapshotOfNewStreams(): AirbyteMessage?
1717

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class AirbyteFileOffsetBackingStore(private val offsetFilePath: Path, private va
4242
}
4343

4444
fun persist(cdcState: JsonNode?) {
45-
val mapAsString: Map<String, String?> =
46-
if (cdcState != null) Jsons.`object`<Map<*, *>>(cdcState, MutableMap::class.java) else emptyMap<String, String>()
45+
val mapAsString: Map<String, String> =
46+
if (cdcState != null) Jsons.`object`(cdcState, MutableMap::class.java) as Map<String, String> else emptyMap()
4747

4848
val updatedMap = updateStateForDebezium2_1(mapAsString)
4949

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteSchemaHistoryStorage.kt

+2-13
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,6 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc
3737
val isCompressed: Boolean
3838

3939
init {
40-
this.streamName = streamName
41-
this.primaryKey = primaryKey
42-
this.keySequence = keySequence
43-
this.syncCheckpointRecords = syncCheckpointRecords
44-
this.syncCheckpointDuration = syncCheckpointDuration
45-
this.tableName = tableName
46-
this.cursorColumnName = cursorColumnName
47-
this.cursorSqlType = cursorSqlType
48-
this.cause = cause
49-
this.tableSize = tableSize
50-
this.avgRowLength = avgRowLength
5140
this.schema = schema
5241
this.isCompressed = isCompressed
5342
}
@@ -140,7 +129,7 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc
140129
}
141130
}
142131

143-
private fun persist(schemaHistory: SchemaHistory<Optional<JsonNode?>?>?) {
132+
private fun persist(schemaHistory: SchemaHistory<Optional<JsonNode>>?) {
144133
if (schemaHistory!!.schema!!.isEmpty) {
145134
return
146135
}
@@ -223,7 +212,7 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc
223212
return string.toByteArray(StandardCharsets.UTF_8).size.toDouble() / (ONE_MB)
224213
}
225214

226-
fun initializeDBHistory(schemaHistory: SchemaHistory<Optional<JsonNode?>?>?,
215+
fun initializeDBHistory(schemaHistory: SchemaHistory<Optional<JsonNode>>?,
227216
compressSchemaHistoryForState: Boolean): AirbyteSchemaHistoryStorage {
228217
val dbHistoryWorkingDir: Path
229218
try {

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ abstract class DebeziumPropertiesManager(private val properties: Properties,
1717

1818
fun getDebeziumProperties(
1919
offsetManager: AirbyteFileOffsetBackingStore,
20-
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?>): Properties {
20+
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>): Properties {
2121
val props = Properties()
2222
props.putAll(properties)
2323

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ class DebeziumRecordIterator<T>(private val queue: LinkedBlockingQueue<ChangeEve
3535
private val publisherStatusSupplier: Supplier<Boolean>,
3636
private val debeziumShutdownProcedure: DebeziumShutdownProcedure<ChangeEvent<String?, String?>>,
3737
private val firstRecordWaitTime: Duration,
38-
subsequentRecordWaitTime: Duration?) : AbstractIterator<ChangeEventWithMetadata?>(), AutoCloseableIterator<ChangeEventWithMetadata?> {
38+
subsequentRecordWaitTime: Duration?) : AbstractIterator<ChangeEventWithMetadata?>(), AutoCloseableIterator<ChangeEventWithMetadata> {
3939
private val heartbeatEventSourceField: MutableMap<Class<out ChangeEvent<*, *>?>, Field?> = HashMap(1)
4040
private val subsequentRecordWaitTime: Duration = firstRecordWaitTime.dividedBy(2)
4141

4242
private var receivedFirstRecord = false
4343
private var hasSnapshotFinished = true
4444
private var tsLastHeartbeat: LocalDateTime? = null
45-
private var lastHeartbeatPosition: T = null
45+
private var lastHeartbeatPosition: T? = null
4646
private var maxInstanceOfNoRecordsFound = 0
4747
private var signalledDebeziumEngineShutdown = false
4848

@@ -108,7 +108,6 @@ class DebeziumRecordIterator<T>(private val queue: LinkedBlockingQueue<ChangeEve
108108
requestClose("Closing: Change event reached target position")
109109
}
110110
this.tsLastHeartbeat = null
111-
this.lastHeartbeatPosition = null
112111
this.receivedFirstRecord = true
113112
this.maxInstanceOfNoRecordsFound = 0
114113
return changeEventWithMetadata
@@ -192,7 +191,7 @@ class DebeziumRecordIterator<T>(private val queue: LinkedBlockingQueue<ChangeEve
192191
* reflection to setAccessible for each event
193192
*/
194193
@VisibleForTesting
195-
protected fun getHeartbeatPosition(heartbeatEvent: ChangeEvent<String?, String?>): T? {
194+
internal fun getHeartbeatPosition(heartbeatEvent: ChangeEvent<String?, String?>): T {
196195
try {
197196
val eventClass: Class<out ChangeEvent<*, *>?> = heartbeatEvent.javaClass
198197
val f: Field?

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class DebeziumRecordPublisher(private val debeziumPropertiesManager: DebeziumPro
2828

2929
fun start(queue: BlockingQueue<ChangeEvent<String?, String?>>,
3030
offsetManager: AirbyteFileOffsetBackingStore,
31-
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?>) {
31+
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>) {
3232
engine = DebeziumEngine.create(Json::class.java)
3333
.using(debeziumPropertiesManager.getDebeziumProperties(offsetManager, schemaHistoryManager))
3434
.using(OffsetCommitPolicy.AlwaysCommitOffsetPolicy())

0 commit comments

Comments
 (0)