Skip to content

Commit 6d91dbc

Browse files
fix compiler warnings
1 parent 295d35b commit 6d91dbc

File tree

98 files changed

+7524
-3716
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+7524
-3716
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DSLContextFactory.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ object DSLContextFactory {
6767
driverClassName: String,
6868
jdbcConnectionString: String?,
6969
dialect: SQLDialect?,
70-
connectionProperties: Map<String?, String?>?,
70+
connectionProperties: Map<String, String>?,
7171
connectionTimeout: Duration?
7272
): DSLContext {
7373
return DSL.using(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DataSourceFactory.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ object DataSourceFactory {
5050
password: String?,
5151
driverClassName: String,
5252
jdbcConnectionString: String?,
53-
connectionProperties: Map<String?, String?>?,
53+
connectionProperties: Map<String, String>?,
5454
connectionTimeout: Duration?
5555
): DataSource {
5656
return DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
@@ -100,7 +100,7 @@ object DataSourceFactory {
100100
port: Int,
101101
database: String?,
102102
driverClassName: String,
103-
connectionProperties: Map<String?, String?>?
103+
connectionProperties: Map<String, String>?
104104
): DataSource {
105105
return DataSourceBuilder(username, password, driverClassName, host, port, database)
106106
.withConnectionProperties(connectionProperties)
@@ -152,7 +152,7 @@ object DataSourceFactory {
152152
private var password: String?,
153153
private var driverClassName: String
154154
) {
155-
private var connectionProperties: Map<String?, String?> = java.util.Map.of()
155+
private var connectionProperties: Map<String, String> = java.util.Map.of()
156156
private var database: String? = null
157157
private var host: String? = null
158158
private var jdbcUrl: String? = null
@@ -185,7 +185,7 @@ object DataSourceFactory {
185185
}
186186

187187
fun withConnectionProperties(
188-
connectionProperties: Map<String?, String?>?
188+
connectionProperties: Map<String, String>?
189189
): DataSourceBuilder {
190190
if (connectionProperties != null) {
191191
this.connectionProperties = connectionProperties

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory
1616

1717
/** Implementation of source operations with standard JDBC types. */
1818
class JdbcSourceOperations :
19-
AbstractJdbcCompatibleSourceOperations<JDBCType?>(), SourceOperations<ResultSet, JDBCType?> {
19+
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
2020
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
2121
return try {
2222
JDBCType.valueOf(columnTypeInt)
@@ -65,7 +65,7 @@ class JdbcSourceOperations :
6565
preparedStatement: PreparedStatement,
6666
parameterIndex: Int,
6767
cursorFieldType: JDBCType?,
68-
value: String
68+
value: String?
6969
) {
7070
when (cursorFieldType) {
7171
JDBCType.TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value)
@@ -80,12 +80,12 @@ class JdbcSourceOperations :
8080
JDBCType.TINYINT,
8181
JDBCType.SMALLINT -> setShortInt(preparedStatement, parameterIndex, value!!)
8282
JDBCType.INTEGER -> setInteger(preparedStatement, parameterIndex, value!!)
83-
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value)
83+
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value!!)
8484
JDBCType.FLOAT,
8585
JDBCType.DOUBLE -> setDouble(preparedStatement, parameterIndex, value!!)
8686
JDBCType.REAL -> setReal(preparedStatement, parameterIndex, value!!)
8787
JDBCType.NUMERIC,
88-
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value)
88+
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value!!)
8989
JDBCType.CHAR,
9090
JDBCType.NCHAR,
9191
JDBCType.NVARCHAR,
@@ -147,7 +147,7 @@ class JdbcSourceOperations :
147147
return JdbcUtils.ALLOWED_CURSOR_TYPES.contains(type)
148148
}
149149

150-
override fun getAirbyteType(jdbcType: JDBCType?): JsonSchemaType {
150+
override fun getAirbyteType(jdbcType: JDBCType): JsonSchemaType {
151151
return when (jdbcType) {
152152
JDBCType.BIT,
153153
JDBCType.BOOLEAN -> JsonSchemaType.BOOLEAN

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import java.util.*
1111

1212
abstract class JdbcConnector
1313
protected constructor(@JvmField protected val driverClassName: String) : BaseConnector() {
14-
protected fun getConnectionTimeout(connectionProperties: Map<String?, String?>): Duration {
14+
protected fun getConnectionTimeout(connectionProperties: Map<String, String>): Duration {
1515
return getConnectionTimeout(connectionProperties, driverClassName)
1616
}
1717

@@ -37,7 +37,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
3737
* @return DataSourceBuilder class used to create dynamic fields for DataSource
3838
*/
3939
fun getConnectionTimeout(
40-
connectionProperties: Map<String?, String?>,
40+
connectionProperties: Map<String, String>,
4141
driverClassName: String?
4242
): Duration {
4343
val parsedConnectionTimeout =

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/Source.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ interface Source : Integration {
4242
@Throws(Exception::class)
4343
fun read(
4444
config: JsonNode,
45-
catalog: ConfiguredAirbyteCatalog?,
45+
catalog: ConfiguredAirbyteCatalog,
4646
state: JsonNode?
4747
): AutoCloseableIterator<AirbyteMessage>
4848

@@ -65,7 +65,7 @@ interface Source : Integration {
6565
@Throws(Exception::class)
6666
fun readStreams(
6767
config: JsonNode,
68-
catalog: ConfiguredAirbyteCatalog?,
68+
catalog: ConfiguredAirbyteCatalog,
6969
state: JsonNode?
7070
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
7171
return List.of(read(config, catalog, state))

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingSource.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
3535
@Throws(Exception::class)
3636
override fun read(
3737
config: JsonNode,
38-
catalog: ConfiguredAirbyteCatalog?,
38+
catalog: ConfiguredAirbyteCatalog,
3939
state: JsonNode?
4040
): AutoCloseableIterator<AirbyteMessage> {
4141
return source.read(config, catalog, state)
@@ -44,7 +44,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
4444
@Throws(Exception::class)
4545
override fun readStreams(
4646
config: JsonNode,
47-
catalog: ConfiguredAirbyteCatalog?,
47+
catalog: ConfiguredAirbyteCatalog,
4848
state: JsonNode?
4949
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
5050
return source.readStreams(config, catalog, state)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshWrappedSource.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class SshWrappedSource : Source {
7676
@Throws(Exception::class)
7777
override fun read(
7878
config: JsonNode,
79-
catalog: ConfiguredAirbyteCatalog?,
79+
catalog: ConfiguredAirbyteCatalog,
8080
state: JsonNode?
8181
): AutoCloseableIterator<AirbyteMessage> {
8282
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
@@ -97,7 +97,7 @@ class SshWrappedSource : Source {
9797
@Throws(Exception::class)
9898
override fun readStreams(
9999
config: JsonNode,
100-
catalog: ConfiguredAirbyteCatalog?,
100+
catalog: ConfiguredAirbyteCatalog,
101101
state: JsonNode?
102102
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
103103
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.28.10
1+
version=0.28.11

airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ java {
1111
}
1212
}
1313

14+
compileKotlin.compilerOptions.allWarningsAsErrors = false
15+
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
16+
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
17+
18+
1419
// Convert yaml to java: relationaldb.models
1520
jsonSchema2Pojo {
1621
sourceType = SourceType.YAMLSCHEMA
@@ -53,4 +58,5 @@ dependencies {
5358
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:datastore-postgres'))
5459

5560
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
61+
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
5662
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt

+75-43
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,43 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1414
import io.airbyte.protocol.models.v0.SyncMode
1515
import io.debezium.engine.ChangeEvent
1616
import io.debezium.engine.DebeziumEngine
17-
import org.slf4j.Logger
18-
import org.slf4j.LoggerFactory
1917
import java.time.Duration
2018
import java.time.Instant
2119
import java.time.temporal.ChronoUnit
2220
import java.util.*
2321
import java.util.concurrent.LinkedBlockingQueue
22+
import org.slf4j.Logger
23+
import org.slf4j.LoggerFactory
2424

2525
/**
2626
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
2727
* to use debezium for CDC, it should use this class
2828
*/
29-
class AirbyteDebeziumHandler<T>(private val config: JsonNode,
30-
private val targetPosition: CdcTargetPosition<T>,
31-
private val trackSchemaHistory: Boolean,
32-
private val firstRecordWaitTime: Duration,
33-
private val subsequentRecordWaitTime: Duration,
34-
private val queueSize: Int,
35-
private val addDbNameToOffsetState: Boolean) {
36-
internal inner class CapacityReportingBlockingQueue<E>(capacity: Int) : LinkedBlockingQueue<E>(capacity) {
29+
class AirbyteDebeziumHandler<T>(
30+
private val config: JsonNode,
31+
private val targetPosition: CdcTargetPosition<T>,
32+
private val trackSchemaHistory: Boolean,
33+
private val firstRecordWaitTime: Duration,
34+
private val subsequentRecordWaitTime: Duration,
35+
private val queueSize: Int,
36+
private val addDbNameToOffsetState: Boolean
37+
) {
38+
internal inner class CapacityReportingBlockingQueue<E>(capacity: Int) :
39+
LinkedBlockingQueue<E>(capacity) {
3740
private var lastReport: Instant? = null
3841

3942
private fun reportQueueUtilization() {
40-
if (lastReport == null || Duration.between(lastReport, Instant.now()).compareTo(Companion.REPORT_DURATION) > 0) {
41-
LOGGER.info("CDC events queue size: {}. remaining {}", this.size, this.remainingCapacity())
42-
synchronized(this) {
43-
lastReport = Instant.now()
44-
}
43+
if (
44+
lastReport == null ||
45+
Duration.between(lastReport, Instant.now())
46+
.compareTo(Companion.REPORT_DURATION) > 0
47+
) {
48+
LOGGER.info(
49+
"CDC events queue size: {}. remaining {}",
50+
this.size,
51+
this.remainingCapacity()
52+
)
53+
synchronized(this) { lastReport = Instant.now() }
4554
}
4655
}
4756

@@ -55,44 +64,62 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
5564
reportQueueUtilization()
5665
return super.poll()
5766
}
58-
59-
companion object {
60-
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)
61-
}
6267
}
6368

64-
fun getIncrementalIterators(debeziumPropertiesManager: DebeziumPropertiesManager,
65-
eventConverter: DebeziumEventConverter,
66-
cdcSavedInfoFetcher: CdcSavedInfoFetcher,
67-
cdcStateHandler: CdcStateHandler): AutoCloseableIterator<AirbyteMessage> {
69+
fun getIncrementalIterators(
70+
debeziumPropertiesManager: DebeziumPropertiesManager,
71+
eventConverter: DebeziumEventConverter,
72+
cdcSavedInfoFetcher: CdcSavedInfoFetcher,
73+
cdcStateHandler: CdcStateHandler
74+
): AutoCloseableIterator<AirbyteMessage> {
6875
LOGGER.info("Using CDC: {}", true)
69-
LOGGER.info("Using DBZ version: {}", DebeziumEngine::class.java.getPackage().implementationVersion)
70-
val offsetManager: AirbyteFileOffsetBackingStore = AirbyteFileOffsetBackingStore.Companion.initializeState(
76+
LOGGER.info(
77+
"Using DBZ version: {}",
78+
DebeziumEngine::class.java.getPackage().implementationVersion
79+
)
80+
val offsetManager: AirbyteFileOffsetBackingStore =
81+
AirbyteFileOffsetBackingStore.Companion.initializeState(
7182
cdcSavedInfoFetcher.savedOffset,
72-
if (addDbNameToOffsetState) Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText()) else Optional.empty<String>())
73-
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?> = if (trackSchemaHistory
74-
) Optional.of<AirbyteSchemaHistoryStorage?>(AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
75-
cdcSavedInfoFetcher.savedSchemaHistory, cdcStateHandler.compressSchemaHistoryForState()))
76-
else Optional.empty<AirbyteSchemaHistoryStorage>()
83+
if (addDbNameToOffsetState)
84+
Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText())
85+
else Optional.empty<String>()
86+
)
87+
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage> =
88+
if (trackSchemaHistory)
89+
Optional.of<AirbyteSchemaHistoryStorage?>(
90+
AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
91+
cdcSavedInfoFetcher.savedSchemaHistory,
92+
cdcStateHandler.compressSchemaHistoryForState()
93+
)
94+
)
95+
else Optional.empty<AirbyteSchemaHistoryStorage>()
7796
val publisher = DebeziumRecordPublisher(debeziumPropertiesManager)
78-
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> = CapacityReportingBlockingQueue<ChangeEvent<String, String>>(queueSize)
97+
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> =
98+
CapacityReportingBlockingQueue(queueSize)
7999
publisher.start(queue, offsetManager, schemaHistoryManager)
80100
// handle state machine around pub/sub logic.
81-
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> = DebeziumRecordIterator(
101+
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> =
102+
DebeziumRecordIterator(
82103
queue,
83104
targetPosition,
84105
{ publisher.hasClosed() },
85106
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
86107
firstRecordWaitTime,
87-
subsequentRecordWaitTime)
108+
subsequentRecordWaitTime
109+
)
88110

89-
val syncCheckpointDuration = if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY)
90-
) Duration.ofSeconds(config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong())
91-
else DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION
92-
val syncCheckpointRecords = if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY)
93-
) config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
94-
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()
95-
return AutoCloseableIterators.fromIterator(DebeziumStateDecoratingIterator(
111+
val syncCheckpointDuration =
112+
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY))
113+
Duration.ofSeconds(
114+
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong()
115+
)
116+
else DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION
117+
val syncCheckpointRecords =
118+
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY))
119+
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
120+
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()
121+
return AutoCloseableIterators.fromIterator(
122+
DebeziumStateDecoratingIterator(
96123
eventIterator,
97124
cdcStateHandler,
98125
targetPosition,
@@ -101,11 +128,14 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
101128
trackSchemaHistory,
102129
schemaHistoryManager.orElse(null),
103130
syncCheckpointDuration,
104-
syncCheckpointRecords))
131+
syncCheckpointRecords
132+
)
133+
)
105134
}
106135

107136
companion object {
108137
private val LOGGER: Logger = LoggerFactory.getLogger(AirbyteDebeziumHandler::class.java)
138+
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)
109139

110140
/**
111141
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
@@ -115,8 +145,10 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
115145
const val QUEUE_CAPACITY: Int = 10000
116146

117147
fun isAnyStreamIncrementalSyncMode(catalog: ConfiguredAirbyteCatalog): Boolean {
118-
return catalog.streams.stream().map { obj: ConfiguredAirbyteStream -> obj.syncMode }
119-
.anyMatch { syncMode: SyncMode -> syncMode == SyncMode.INCREMENTAL }
148+
return catalog.streams
149+
.stream()
150+
.map { obj: ConfiguredAirbyteStream -> obj.syncMode }
151+
.anyMatch { syncMode: SyncMode -> syncMode == SyncMode.INCREMENTAL }
120152
}
121153
}
122154
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcMetadataInjector.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@ interface CdcMetadataInjector<T> {
1717
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
1818
*
1919
* @param event is the actual record which contains data and would be written to the destination
20-
* @param source contains the metadata about the record and we need to extract that metadata and add
21-
* it to the event before writing it to destination
20+
* @param source contains the metadata about the record and we need to extract that metadata and
21+
* add it to the event before writing it to destination
2222
*/
2323
fun addMetaData(event: ObjectNode?, source: JsonNode?)
2424

25-
fun addMetaDataToRowsFetchedOutsideDebezium(record: ObjectNode?, transactionTimestamp: String?, metadataToAdd: T) {
25+
fun addMetaDataToRowsFetchedOutsideDebezium(
26+
record: ObjectNode?,
27+
transactionTimestamp: String?,
28+
metadataToAdd: T
29+
) {
2630
throw RuntimeException("Not Supported")
2731
}
2832

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcSavedInfoFetcher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ import java.util.*
1414
interface CdcSavedInfoFetcher {
1515
val savedOffset: JsonNode?
1616

17-
val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode?>?>?
17+
val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode>>?
1818
}

0 commit comments

Comments
 (0)