Skip to content

Commit 312c129

Browse files
fix compiler warnings
1 parent c1d7475 commit 312c129

File tree

68 files changed

+464
-489
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+464
-489
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/Destination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ interface Destination : Integration {
3434
fun getConsumer(
3535
config: JsonNode,
3636
catalog: ConfiguredAirbyteCatalog?,
37-
outputRecordCollector: Consumer<AirbyteMessage?>?
37+
outputRecordCollector: Consumer<AirbyteMessage>
3838
): AirbyteMessageConsumer?
3939

4040
/**
@@ -53,7 +53,7 @@ interface Destination : Integration {
5353
fun getSerializedMessageConsumer(
5454
config: JsonNode,
5555
catalog: ConfiguredAirbyteCatalog?,
56-
outputRecordCollector: Consumer<AirbyteMessage?>?
56+
outputRecordCollector: Consumer<AirbyteMessage>
5757
): SerializedAirbyteMessageConsumer? {
5858
return ShimToSerializedAirbyteMessageConsumer(
5959
getConsumer(config, catalog, outputRecordCollector)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ class IntegrationRunner
4646
@VisibleForTesting
4747
internal constructor(
4848
cliParser: IntegrationCliParser,
49-
outputRecordCollector: Consumer<AirbyteMessage?>,
49+
outputRecordCollector: Consumer<AirbyteMessage>,
5050
destination: Destination?,
5151
source: Source?
5252
) {
5353
private val cliParser: IntegrationCliParser
54-
private val outputRecordCollector: Consumer<AirbyteMessage?>
54+
private val outputRecordCollector: Consumer<AirbyteMessage>
5555
private val integration: Integration
5656
private val destination: Destination?
5757
private val source: Source?
@@ -61,7 +61,7 @@ internal constructor(
6161
destination: Destination?
6262
) : this(
6363
IntegrationCliParser(),
64-
Consumer<AirbyteMessage?> { message: AirbyteMessage? ->
64+
Consumer<AirbyteMessage> { message: AirbyteMessage ->
6565
Destination.Companion.defaultOutputRecordCollector(message)
6666
},
6767
destination,
@@ -72,7 +72,7 @@ internal constructor(
7272
source: Source?
7373
) : this(
7474
IntegrationCliParser(),
75-
Consumer<AirbyteMessage?> { message: AirbyteMessage? ->
75+
Consumer<AirbyteMessage> { message: AirbyteMessage ->
7676
Destination.Companion.defaultOutputRecordCollector(message)
7777
},
7878
null,
@@ -99,7 +99,7 @@ internal constructor(
9999
@VisibleForTesting
100100
internal constructor(
101101
cliParser: IntegrationCliParser,
102-
outputRecordCollector: Consumer<AirbyteMessage?>,
102+
outputRecordCollector: Consumer<AirbyteMessage>,
103103
destination: Destination?,
104104
source: Source?,
105105
jsonSchemaValidator: JsonSchemaValidator
@@ -254,7 +254,7 @@ internal constructor(
254254

255255
private fun produceMessages(
256256
messageIterator: AutoCloseableIterator<AirbyteMessage>,
257-
recordCollector: Consumer<AirbyteMessage?>
257+
recordCollector: Consumer<AirbyteMessage>
258258
) {
259259
messageIterator!!.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
260260
LOGGER.debug("Producing messages for stream {}...", s)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ abstract class SpecModifyingDestination(private val destination: Destination) :
3131
override fun getConsumer(
3232
config: JsonNode,
3333
catalog: ConfiguredAirbyteCatalog?,
34-
outputRecordCollector: Consumer<AirbyteMessage?>?
34+
outputRecordCollector: Consumer<AirbyteMessage>
3535
): AirbyteMessageConsumer? {
3636
return destination.getConsumer(config, catalog, outputRecordCollector)
3737
}
@@ -40,7 +40,7 @@ abstract class SpecModifyingDestination(private val destination: Destination) :
4040
override fun getSerializedMessageConsumer(
4141
config: JsonNode,
4242
catalog: ConfiguredAirbyteCatalog?,
43-
outputRecordCollector: Consumer<AirbyteMessage?>?
43+
outputRecordCollector: Consumer<AirbyteMessage>
4444
): SerializedAirbyteMessageConsumer? {
4545
return destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)
4646
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class SshWrappedDestination : Destination {
9494
override fun getConsumer(
9595
config: JsonNode,
9696
catalog: ConfiguredAirbyteCatalog?,
97-
outputRecordCollector: Consumer<AirbyteMessage?>?
97+
outputRecordCollector: Consumer<AirbyteMessage>
9898
): AirbyteMessageConsumer? {
9999
val tunnel = getTunnelInstance(config)
100100

@@ -120,7 +120,7 @@ class SshWrappedDestination : Destination {
120120
override fun getSerializedMessageConsumer(
121121
config: JsonNode,
122122
catalog: ConfiguredAirbyteCatalog?,
123-
outputRecordCollector: Consumer<AirbyteMessage?>?
123+
outputRecordCollector: Consumer<AirbyteMessage>
124124
): SerializedAirbyteMessageConsumer? {
125125
val clone = Jsons.clone(config)
126126
val connectionOptionsConfig: Optional<JsonNode> =

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/function/DestinationFlushFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ interface DestinationFlushFunction {
3838
@Throws(Exception::class)
3939
fun flush(
4040
decs: StreamDescriptor,
41-
stream: Stream<PartialAirbyteMessage?>,
41+
stream: Stream<PartialAirbyteMessage>,
4242
)
4343

4444
/**

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory
6060
class BufferedStreamConsumer
6161
@VisibleForTesting
6262
internal constructor(
63-
private val outputRecordCollector: Consumer<AirbyteMessage?>?,
63+
private val outputRecordCollector: Consumer<AirbyteMessage>,
6464
private val onStart: OnStartFunction,
6565
private val bufferingStrategy: BufferingStrategy,
6666
private val onClose: OnCloseFunction,
@@ -87,7 +87,7 @@ internal constructor(
8787
*/
8888
@Deprecated("")
8989
constructor(
90-
outputRecordCollector: Consumer<AirbyteMessage?>?,
90+
outputRecordCollector: Consumer<AirbyteMessage>,
9191
onStart: OnStartFunction,
9292
bufferingStrategy: BufferingStrategy,
9393
onClose: OnCloseFunction,
@@ -109,7 +109,7 @@ internal constructor(
109109
)
110110

111111
constructor(
112-
outputRecordCollector: Consumer<AirbyteMessage?>?,
112+
outputRecordCollector: Consumer<AirbyteMessage>,
113113
onStart: OnStartFunction,
114114
bufferingStrategy: BufferingStrategy,
115115
onClose: OnCloseFunction,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/RecordWriter.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.buffered_stream_consumer
66
import io.airbyte.commons.functional.CheckedBiConsumer
77
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
88

9-
interface RecordWriter<T> : CheckedBiConsumer<AirbyteStreamNameNamespacePair, List<T>, Exception> {
9+
fun interface RecordWriter<T> : CheckedBiConsumer<AirbyteStreamNameNamespacePair, List<T>, Exception> {
1010
@Throws(Exception::class)
1111
override fun accept(stream: AirbyteStreamNameNamespacePair, records: List<T>)
1212
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ interface SqlOperations {
9999
*/
100100
@Throws(Exception::class)
101101
fun insertRecords(
102-
database: JdbcDatabase?,
103-
records: List<PartialAirbyteMessage?>?,
102+
database: JdbcDatabase,
103+
records: List<PartialAirbyteMessage>,
104104
schemaName: String?,
105105
tableName: String?
106106
)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@ import org.slf4j.LoggerFactory
2121
* This should be deprecated as we slowly move towards using [SerializedBufferingStrategy] instead.
2222
*/
2323
class InMemoryRecordBufferingStrategy(
24-
private val recordWriter: RecordWriter<AirbyteRecordMessage?>,
24+
private val recordWriter: RecordWriter<AirbyteRecordMessage>,
2525
private val checkAndRemoveRecordWriter: CheckAndRemoveRecordWriter?,
2626
private val maxQueueSizeInBytes: Long
2727
) : BufferingStrategy {
2828
private var streamBuffer:
29-
MutableMap<AirbyteStreamNameNamespacePair, MutableList<AirbyteRecordMessage?>> =
29+
MutableMap<AirbyteStreamNameNamespacePair, MutableList<AirbyteRecordMessage>> =
3030
HashMap()
3131
private var fileName: String? = null
3232

3333
private val recordSizeEstimator = RecordSizeEstimator()
3434
private var bufferSizeInBytes: Long = 0
3535

3636
constructor(
37-
recordWriter: RecordWriter<AirbyteRecordMessage?>,
37+
recordWriter: RecordWriter<AirbyteRecordMessage>,
3838
maxQueueSizeInBytes: Long
3939
) : this(recordWriter, null, maxQueueSizeInBytes)
4040

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory
3838

3939
internal class IntegrationRunnerTest {
4040
private lateinit var cliParser: IntegrationCliParser
41-
private lateinit var stdoutConsumer: Consumer<AirbyteMessage?>
41+
private lateinit var stdoutConsumer: Consumer<AirbyteMessage>
4242
private lateinit var destination: Destination
4343
private lateinit var source: Source
4444
private lateinit var configPath: Path

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ class AsyncStreamConsumerTest {
553553
namespace: String,
554554
allRecords: List<AirbyteMessage>,
555555
) {
556-
val argumentCaptor = org.mockito.kotlin.argumentCaptor<Stream<PartialAirbyteMessage?>>()
556+
val argumentCaptor = org.mockito.kotlin.argumentCaptor<Stream<PartialAirbyteMessage>>()
557557
Mockito.verify(flushFunction, Mockito.atLeast(1))
558558
.flush(
559559
org.mockito.kotlin.eq(

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import org.mockito.kotlin.mock
3030
class BufferedStreamConsumerTest {
3131
private lateinit var consumer: BufferedStreamConsumer
3232
private lateinit var onStart: OnStartFunction
33-
private lateinit var recordWriter: RecordWriter<AirbyteRecordMessage?>
33+
private lateinit var recordWriter: RecordWriter<AirbyteRecordMessage>
3434
private lateinit var onClose: OnCloseFunction
3535
private lateinit var isValidRecord: CheckedFunction<JsonNode?, Boolean?, Exception?>
36-
private lateinit var outputRecordCollector: Consumer<AirbyteMessage?>
36+
private lateinit var outputRecordCollector: Consumer<AirbyteMessage>
3737

3838
@BeforeEach
3939
@Throws(Exception::class)

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import org.mockito.Mockito
1616
import org.mockito.kotlin.mock
1717

1818
class InMemoryRecordBufferingStrategyTest {
19-
private val recordWriter: RecordWriter<AirbyteRecordMessage?> = mock()
19+
private val recordWriter: RecordWriter<AirbyteRecordMessage> = mock()
2020

2121
@Test
2222
@Throws(Exception::class)

airbyte-cdk/java/airbyte-cdk/db-destinations/build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ java {
88
}
99
}
1010

11+
compileKotlin.compilerOptions.allWarningsAsErrors = false
12+
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
13+
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
14+
1115
dependencies {
1216
api 'org.apache.commons:commons-csv:1.10.0'
1317

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+19-19
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import io.airbyte.cdk.integrations.JdbcConnector
1515
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer
1616
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.emitConfigErrorTrace
1717
import io.airbyte.cdk.integrations.base.Destination
18+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
1819
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
1920
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOverride
2021
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.isDestinationV2
@@ -47,7 +48,7 @@ import java.util.*
4748
import java.util.function.Consumer
4849
import javax.sql.DataSource
4950

50-
abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationState?>
51+
abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationState>
5152
(driverClass: String?,
5253
protected val namingResolver: NamingConventionTransformer,
5354
protected val sqlOperations: SqlOperations) : JdbcConnector(driverClass!!), Destination {
@@ -63,7 +64,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
6364
attemptTableOperations(outputSchema, database, namingResolver, sqlOperations, false)
6465
if (isDestinationV2) {
6566
val v2RawSchema = namingResolver.getIdentifier(getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
66-
.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE))
67+
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE))
6768
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false)
6869
destinationSpecificTableOperations(database)
6970
}
@@ -107,7 +108,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
107108
* [.modifyDataSourceBuilder] rather than this method.
108109
*/
109110
@VisibleForTesting
110-
fun getDataSource(config: JsonNode?): DataSource {
111+
fun getDataSource(config: JsonNode): DataSource {
111112
val jdbcConfig = toJdbcConfig(config)
112113
val connectionProperties = getConnectionProperties(config)
113114
val builder = DataSourceFactory.DataSourceBuilder(
@@ -129,7 +130,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
129130
return DefaultJdbcDatabase(dataSource!!)
130131
}
131132

132-
protected fun getConnectionProperties(config: JsonNode?): Map<String, String> {
133+
protected open fun getConnectionProperties(config: JsonNode?): Map<String, String> {
133134
val customProperties = parseJdbcParameters(config!!, JdbcUtils.JDBC_URL_PARAMS_KEY)
134135
val defaultProperties = getDefaultConnectionProperties(config)
135136
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties)
@@ -143,27 +144,26 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
143144
}
144145
}
145146

146-
protected abstract fun getDefaultConnectionProperties(config: JsonNode?): Map<String, String>
147+
protected abstract fun getDefaultConnectionProperties(config: JsonNode): Map<String, String>
147148

148-
abstract fun toJdbcConfig(config: JsonNode?): JsonNode
149+
abstract fun toJdbcConfig(config: JsonNode): JsonNode
149150

150151
protected abstract val sqlGenerator: JdbcSqlGenerator
151-
get
152152

153-
protected abstract fun getDestinationHandler(databaseName: String?,
154-
database: JdbcDatabase?,
155-
rawTableSchema: String?): JdbcDestinationHandler<DestinationState>
153+
protected abstract fun getDestinationHandler(databaseName: String,
154+
database: JdbcDatabase,
155+
rawTableSchema: String): JdbcDestinationHandler<DestinationState>
156156

157157
/**
158158
* Provide any migrations that the destination needs to run. Most destinations will need to provide
159159
* an instande of
160160
* [io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator] at minimum.
161161
*/
162162
protected abstract fun getMigrations(
163-
database: JdbcDatabase?,
164-
databaseName: String?,
165-
sqlGenerator: SqlGenerator?,
166-
destinationHandler: DestinationHandler<DestinationState>?): List<Migration<DestinationState>>
163+
database: JdbcDatabase,
164+
databaseName: String,
165+
sqlGenerator: SqlGenerator,
166+
destinationHandler: DestinationHandler<DestinationState>): List<Migration<DestinationState>>
167167

168168
/**
169169
* "database" key at root of the config json, for any other variants in config, override this
@@ -183,18 +183,18 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
183183

184184
override fun getConsumer(config: JsonNode,
185185
catalog: ConfiguredAirbyteCatalog?,
186-
outputRecordCollector: Consumer<AirbyteMessage?>?): AirbyteMessageConsumer? {
186+
outputRecordCollector: Consumer<AirbyteMessage>): AirbyteMessageConsumer? {
187187
throw NotImplementedException("Should use the getSerializedMessageConsumer instead")
188188
}
189189

190190
@Throws(Exception::class)
191191
override fun getSerializedMessageConsumer(config: JsonNode,
192192
catalog: ConfiguredAirbyteCatalog?,
193-
outputRecordCollector: Consumer<AirbyteMessage?>?): SerializedAirbyteMessageConsumer? {
193+
outputRecordCollector: Consumer<AirbyteMessage>): SerializedAirbyteMessageConsumer? {
194194
val database = getDatabase(getDataSource(config))
195195
// Short circuit for non-v2 destinations.
196196
if (!isDestinationV2) {
197-
return createAsync(
197+
return JdbcBufferedConsumerFactory.createAsync(
198198
outputRecordCollector,
199199
database,
200200
sqlOperations,
@@ -212,7 +212,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
212212

213213
private fun getV2MessageConsumer(config: JsonNode,
214214
catalog: ConfiguredAirbyteCatalog?,
215-
outputRecordCollector: Consumer<AirbyteMessage?>?,
215+
outputRecordCollector: Consumer<AirbyteMessage>,
216216
database: JdbcDatabase,
217217
defaultNamespace: String): SerializedAirbyteMessageConsumer? {
218218
val sqlGenerator = sqlGenerator
@@ -225,7 +225,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
225225
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
226226
val v2TableMigrator = NoopV2TableMigrator()
227227
val destinationHandler: DestinationHandler<DestinationState> =
228-
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE))
228+
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE))
229229
val disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
230230
val typerDeduper: TyperDeduper
231231
val migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler)

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/ColumnDefinition.kt

-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
*/
44
package io.airbyte.cdk.integrations.destination.jdbc
55

6-
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig.columns
7-
86
/**
97
* Jdbc destination column definition representation
108
*
@@ -19,7 +17,6 @@ class ColumnDefinition(name: String, type: String, columnSize: Int, isNullable:
1917
val isNullable: Boolean
2018

2119
init {
22-
this.columns = columns
2320
this.name = name
2421
this.type = type
2522
this.columnSize = columnSize

0 commit comments

Comments
 (0)