Skip to content

Commit 3adc11e

Browse files
fix compiler warnings
1 parent c560cff commit 3adc11e

File tree

92 files changed

+3917
-2197
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+3917
-2197
lines changed

airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ abstract class AzureBlobStorageStreamCopier(
172172
}
173173

174174
@Throws(Exception::class)
175-
override fun generateMergeStatement(destTableName: String?): String? {
175+
override fun generateMergeStatement(destTableName: String?): String {
176176
LOGGER.info(
177177
"Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.",
178178
tmpTableName,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/Destination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ interface Destination : Integration {
3434
fun getConsumer(
3535
config: JsonNode,
3636
catalog: ConfiguredAirbyteCatalog,
37-
outputRecordCollector: Consumer<AirbyteMessage?>?
37+
outputRecordCollector: Consumer<AirbyteMessage>
3838
): AirbyteMessageConsumer?
3939

4040
/**
@@ -53,7 +53,7 @@ interface Destination : Integration {
5353
fun getSerializedMessageConsumer(
5454
config: JsonNode,
5555
catalog: ConfiguredAirbyteCatalog,
56-
outputRecordCollector: Consumer<AirbyteMessage?>?
56+
outputRecordCollector: Consumer<AirbyteMessage>
5757
): SerializedAirbyteMessageConsumer? {
5858
return ShimToSerializedAirbyteMessageConsumer(
5959
getConsumer(config, catalog, outputRecordCollector)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ class IntegrationRunner
4646
@VisibleForTesting
4747
internal constructor(
4848
cliParser: IntegrationCliParser,
49-
outputRecordCollector: Consumer<AirbyteMessage?>,
49+
outputRecordCollector: Consumer<AirbyteMessage>,
5050
destination: Destination?,
5151
source: Source?
5252
) {
5353
private val cliParser: IntegrationCliParser
54-
private val outputRecordCollector: Consumer<AirbyteMessage?>
54+
private val outputRecordCollector: Consumer<AirbyteMessage>
5555
private val integration: Integration
5656
private val destination: Destination?
5757
private val source: Source?
@@ -61,7 +61,7 @@ internal constructor(
6161
destination: Destination?
6262
) : this(
6363
IntegrationCliParser(),
64-
Consumer<AirbyteMessage?> { message: AirbyteMessage? ->
64+
Consumer<AirbyteMessage> { message: AirbyteMessage ->
6565
Destination.Companion.defaultOutputRecordCollector(message)
6666
},
6767
destination,
@@ -72,7 +72,7 @@ internal constructor(
7272
source: Source?
7373
) : this(
7474
IntegrationCliParser(),
75-
Consumer<AirbyteMessage?> { message: AirbyteMessage? ->
75+
Consumer<AirbyteMessage> { message: AirbyteMessage ->
7676
Destination.Companion.defaultOutputRecordCollector(message)
7777
},
7878
null,
@@ -99,7 +99,7 @@ internal constructor(
9999
@VisibleForTesting
100100
internal constructor(
101101
cliParser: IntegrationCliParser,
102-
outputRecordCollector: Consumer<AirbyteMessage?>,
102+
outputRecordCollector: Consumer<AirbyteMessage>,
103103
destination: Destination?,
104104
source: Source?,
105105
jsonSchemaValidator: JsonSchemaValidator
@@ -254,7 +254,7 @@ internal constructor(
254254

255255
private fun produceMessages(
256256
messageIterator: AutoCloseableIterator<AirbyteMessage>,
257-
recordCollector: Consumer<AirbyteMessage?>
257+
recordCollector: Consumer<AirbyteMessage>
258258
) {
259259
messageIterator!!.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
260260
LOGGER.debug("Producing messages for stream {}...", s)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ abstract class SpecModifyingDestination(private val destination: Destination) :
3131
override fun getConsumer(
3232
config: JsonNode,
3333
catalog: ConfiguredAirbyteCatalog,
34-
outputRecordCollector: Consumer<AirbyteMessage?>?
34+
outputRecordCollector: Consumer<AirbyteMessage>
3535
): AirbyteMessageConsumer? {
3636
return destination.getConsumer(config, catalog, outputRecordCollector)
3737
}
@@ -40,7 +40,7 @@ abstract class SpecModifyingDestination(private val destination: Destination) :
4040
override fun getSerializedMessageConsumer(
4141
config: JsonNode,
4242
catalog: ConfiguredAirbyteCatalog,
43-
outputRecordCollector: Consumer<AirbyteMessage?>?
43+
outputRecordCollector: Consumer<AirbyteMessage>
4444
): SerializedAirbyteMessageConsumer? {
4545
return destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)
4646
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class SshWrappedDestination : Destination {
9494
override fun getConsumer(
9595
config: JsonNode,
9696
catalog: ConfiguredAirbyteCatalog,
97-
outputRecordCollector: Consumer<AirbyteMessage?>?
97+
outputRecordCollector: Consumer<AirbyteMessage>
9898
): AirbyteMessageConsumer? {
9999
val tunnel = getTunnelInstance(config)
100100

@@ -120,7 +120,7 @@ class SshWrappedDestination : Destination {
120120
override fun getSerializedMessageConsumer(
121121
config: JsonNode,
122122
catalog: ConfiguredAirbyteCatalog,
123-
outputRecordCollector: Consumer<AirbyteMessage?>?
123+
outputRecordCollector: Consumer<AirbyteMessage>
124124
): SerializedAirbyteMessageConsumer? {
125125
val clone = Jsons.clone(config)
126126
val connectionOptionsConfig: Optional<JsonNode> =

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/function/DestinationFlushFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ interface DestinationFlushFunction {
3838
@Throws(Exception::class)
3939
fun flush(
4040
decs: StreamDescriptor,
41-
stream: Stream<PartialAirbyteMessage?>,
41+
stream: Stream<PartialAirbyteMessage>,
4242
)
4343

4444
/**

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory
6060
class BufferedStreamConsumer
6161
@VisibleForTesting
6262
internal constructor(
63-
private val outputRecordCollector: Consumer<AirbyteMessage?>?,
63+
private val outputRecordCollector: Consumer<AirbyteMessage>,
6464
private val onStart: OnStartFunction,
6565
private val bufferingStrategy: BufferingStrategy,
6666
private val onClose: OnCloseFunction,
@@ -87,7 +87,7 @@ internal constructor(
8787
*/
8888
@Deprecated("")
8989
constructor(
90-
outputRecordCollector: Consumer<AirbyteMessage?>?,
90+
outputRecordCollector: Consumer<AirbyteMessage>,
9191
onStart: OnStartFunction,
9292
bufferingStrategy: BufferingStrategy,
9393
onClose: OnCloseFunction,
@@ -109,7 +109,7 @@ internal constructor(
109109
)
110110

111111
constructor(
112-
outputRecordCollector: Consumer<AirbyteMessage?>?,
112+
outputRecordCollector: Consumer<AirbyteMessage>,
113113
onStart: OnStartFunction,
114114
bufferingStrategy: BufferingStrategy,
115115
onClose: OnCloseFunction,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/RecordWriter.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ package io.airbyte.cdk.integrations.destination.buffered_stream_consumer
66
import io.airbyte.commons.functional.CheckedBiConsumer
77
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
88

9-
interface RecordWriter<T> : CheckedBiConsumer<AirbyteStreamNameNamespacePair, List<T>, Exception> {
9+
fun interface RecordWriter<T> :
10+
CheckedBiConsumer<AirbyteStreamNameNamespacePair, List<T>, Exception> {
1011
@Throws(Exception::class)
1112
override fun accept(stream: AirbyteStreamNameNamespacePair, records: List<T>)
1213
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt

+6-11
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ interface SqlOperations {
5252
* @throws Exception exception
5353
*/
5454
@Throws(Exception::class)
55-
fun createTableIfNotExists(database: JdbcDatabase?, schemaName: String?, tableName: String?)
55+
fun createTableIfNotExists(database: JdbcDatabase, schemaName: String?, tableName: String?)
5656

5757
/**
5858
* Query to create a table with provided name in provided schema if it does not already exist.
@@ -72,7 +72,7 @@ interface SqlOperations {
7272
* @throws Exception exception
7373
*/
7474
@Throws(Exception::class)
75-
fun dropTableIfExists(database: JdbcDatabase?, schemaName: String?, tableName: String?)
75+
fun dropTableIfExists(database: JdbcDatabase, schemaName: String?, tableName: String?)
7676

7777
/**
7878
* Query to remove all records from a table. Assumes the table exists.
@@ -82,11 +82,7 @@ interface SqlOperations {
8282
* @param tableName Name of table
8383
* @return Query
8484
*/
85-
fun truncateTableQuery(
86-
database: JdbcDatabase?,
87-
schemaName: String?,
88-
tableName: String?
89-
): String?
85+
fun truncateTableQuery(database: JdbcDatabase?, schemaName: String?, tableName: String?): String
9086

9187
/**
9288
* Insert records into table. Assumes the table exists.
@@ -99,8 +95,8 @@ interface SqlOperations {
9995
*/
10096
@Throws(Exception::class)
10197
fun insertRecords(
102-
database: JdbcDatabase?,
103-
records: List<PartialAirbyteMessage?>?,
98+
database: JdbcDatabase,
99+
records: List<PartialAirbyteMessage>,
104100
schemaName: String?,
105101
tableName: String?
106102
)
@@ -131,8 +127,7 @@ interface SqlOperations {
131127
* @param queries Queries to execute
132128
* @throws Exception exception
133129
*/
134-
@Throws(Exception::class)
135-
fun executeTransaction(database: JdbcDatabase?, queries: List<String?>?)
130+
@Throws(Exception::class) fun executeTransaction(database: JdbcDatabase, queries: List<String>)
136131

137132
/** Check if the data record is valid and ok to be written to destination */
138133
fun isValidData(data: JsonNode?): Boolean

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ interface StreamCopier {
4848
@Throws(Exception::class) fun createDestinationTable(): String?
4949

5050
/** Generates a merge SQL statement from the temporary table to the final table. */
51-
@Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String?
51+
@Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String
5252

5353
/**
5454
* Cleans up the copier by removing the staging file and dropping the temporary table after

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@ import org.slf4j.LoggerFactory
2121
* This should be deprecated as we slowly move towards using [SerializedBufferingStrategy] instead.
2222
*/
2323
class InMemoryRecordBufferingStrategy(
24-
private val recordWriter: RecordWriter<AirbyteRecordMessage?>,
24+
private val recordWriter: RecordWriter<AirbyteRecordMessage>,
2525
private val checkAndRemoveRecordWriter: CheckAndRemoveRecordWriter?,
2626
private val maxQueueSizeInBytes: Long
2727
) : BufferingStrategy {
2828
private var streamBuffer:
29-
MutableMap<AirbyteStreamNameNamespacePair, MutableList<AirbyteRecordMessage?>> =
29+
MutableMap<AirbyteStreamNameNamespacePair, MutableList<AirbyteRecordMessage>> =
3030
HashMap()
3131
private var fileName: String? = null
3232

3333
private val recordSizeEstimator = RecordSizeEstimator()
3434
private var bufferSizeInBytes: Long = 0
3535

3636
constructor(
37-
recordWriter: RecordWriter<AirbyteRecordMessage?>,
37+
recordWriter: RecordWriter<AirbyteRecordMessage>,
3838
maxQueueSizeInBytes: Long
3939
) : this(recordWriter, null, maxQueueSizeInBytes)
4040

Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.28.11
1+
version=0.28.12

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory
3939

4040
internal class IntegrationRunnerTest {
4141
private lateinit var cliParser: IntegrationCliParser
42-
private lateinit var stdoutConsumer: Consumer<AirbyteMessage?>
42+
private lateinit var stdoutConsumer: Consumer<AirbyteMessage>
4343
private lateinit var destination: Destination
4444
private lateinit var source: Source
4545
private lateinit var configPath: Path

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ class AsyncStreamConsumerTest {
553553
namespace: String,
554554
allRecords: List<AirbyteMessage>,
555555
) {
556-
val argumentCaptor = org.mockito.kotlin.argumentCaptor<Stream<PartialAirbyteMessage?>>()
556+
val argumentCaptor = org.mockito.kotlin.argumentCaptor<Stream<PartialAirbyteMessage>>()
557557
Mockito.verify(flushFunction, Mockito.atLeast(1))
558558
.flush(
559559
org.mockito.kotlin.eq(

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ import org.mockito.kotlin.mock
3131
class BufferedStreamConsumerTest {
3232
private lateinit var consumer: BufferedStreamConsumer
3333
private lateinit var onStart: OnStartFunction
34-
private lateinit var recordWriter: RecordWriter<AirbyteRecordMessage?>
34+
private lateinit var recordWriter: RecordWriter<AirbyteRecordMessage>
3535
private lateinit var onClose: OnCloseFunction
3636
private lateinit var isValidRecord: CheckedFunction<JsonNode?, Boolean?, Exception?>
37-
private lateinit var outputRecordCollector: Consumer<AirbyteMessage?>
37+
private lateinit var outputRecordCollector: Consumer<AirbyteMessage>
3838

3939
@BeforeEach
4040
@Throws(Exception::class)

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import org.mockito.Mockito
1616
import org.mockito.kotlin.mock
1717

1818
class InMemoryRecordBufferingStrategyTest {
19-
private val recordWriter: RecordWriter<AirbyteRecordMessage?> = mock()
19+
private val recordWriter: RecordWriter<AirbyteRecordMessage> = mock()
2020

2121
@Test
2222
@Throws(Exception::class)

airbyte-cdk/java/airbyte-cdk/db-destinations/build.gradle

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ java {
88
}
99
}
1010

11+
compileKotlin.compilerOptions.allWarningsAsErrors = false
12+
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
13+
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
14+
1115
dependencies {
1216
api 'org.apache.commons:commons-csv:1.10.0'
1317

@@ -27,4 +31,6 @@ dependencies {
2731
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:typing-deduping'))
2832

2933
testImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
34+
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
35+
3036
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java

Whitespace-only changes.

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java

Whitespace-only changes.

0 commit comments

Comments
 (0)