Skip to content

Commit c304df3

Browse files
cleanup question marks in CDK code (#37518)
just some kotlin cleanup
1 parent 570cc86 commit c304df3

File tree

102 files changed

+578
-655
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+578
-655
lines changed

airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ abstract class AzureBlobStorageStreamCopier(
6666
}
6767
}
6868

69-
override fun prepareStagingFile(): String? {
69+
override fun prepareStagingFile(): String {
7070
currentFile = prepareAzureStagingFile()
7171
val currentFile = this.currentFile!!
7272
if (!azureStagingFiles.contains(currentFile)) {

airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopierFactory.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ abstract class AzureBlobStorageStreamCopierFactory : StreamCopierFactory<AzureBl
2222
nameTransformer: StandardNameTransformer?,
2323
db: JdbcDatabase?,
2424
sqlOperations: SqlOperations?
25-
): StreamCopier? {
25+
): StreamCopier {
2626
try {
2727
val stream = configuredStream!!.stream
2828
val syncMode = configuredStream.destinationSyncMode
@@ -62,5 +62,5 @@ abstract class AzureBlobStorageStreamCopierFactory : StreamCopierFactory<AzureBl
6262
azureBlobConfig: AzureBlobStorageConfig?,
6363
nameTransformer: StandardNameTransformer?,
6464
sqlOperations: SqlOperations?
65-
): StreamCopier?
65+
): StreamCopier
6666
}

airbyte-cdk/java/airbyte-cdk/core/build.gradle

+1-24
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,7 @@
11
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
22
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
33

4-
java {
5-
// TODO: rewrite code to avoid javac wornings in the first place
6-
compileJava {
7-
options.compilerArgs += "-Xlint:-deprecation,-try,-rawtypes,-overloads,-this-escape"
8-
}
9-
compileTestJava {
10-
options.compilerArgs += "-Xlint:-try,-divzero,-cast"
11-
}
12-
compileTestFixturesJava {
13-
options.compilerArgs += "-Xlint:-cast,-deprecation"
14-
}
15-
}
4+
165

176
compileTestFixturesKotlin {
187
compilerOptions {
@@ -38,18 +27,6 @@ compileTestKotlin {
3827
}
3928
}
4029

41-
compileKotlin {
42-
compilerOptions {
43-
jvmTarget = JvmTarget.JVM_21
44-
languageVersion = KotlinVersion.KOTLIN_1_9
45-
freeCompilerArgs = ["-Xjvm-default=all"]
46-
}
47-
dependsOn {
48-
tasks.matching { it.name == 'generate' }
49-
}
50-
}
51-
52-
5330
dependencies {
5431

5532
api 'com.datadoghq:dd-trace-api:1.28.0'

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/ContextQueryFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ import java.sql.SQLException
77
import org.jooq.DSLContext
88

99
fun interface ContextQueryFunction<T> {
10-
@Throws(SQLException::class) fun query(context: DSLContext?): T
10+
@Throws(SQLException::class) fun query(context: DSLContext): T
1111
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DataTypeUtils.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ object DataTypeUtils {
4949

5050
@JvmStatic
5151
fun <T> returnNullIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
52-
return returnNullIfInvalid(valueProducer, Function { _: T? -> true })
52+
return returnNullIfInvalid(valueProducer, Function { _: T -> true })
5353
}
5454

5555
@JvmStatic
5656
fun <T> returnNullIfInvalid(
5757
valueProducer: DataTypeSupplier<T>,
58-
isValidFn: Function<T?, Boolean>
58+
isValidFn: Function<T, Boolean>
5959
): T? {
6060
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
6161
// throw an
@@ -72,13 +72,13 @@ object DataTypeUtils {
7272

7373
@JvmStatic
7474
fun <T> throwExceptionIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
75-
return throwExceptionIfInvalid(valueProducer, Function { _: T? -> true })
75+
return throwExceptionIfInvalid(valueProducer, Function { _: T -> true })
7676
}
7777

7878
@JvmStatic
7979
fun <T> throwExceptionIfInvalid(
8080
valueProducer: DataTypeSupplier<T>,
81-
isValidFn: Function<T?, Boolean>
81+
isValidFn: Function<T, Boolean>
8282
): T? {
8383
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
8484
// throw an

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/Database.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ import org.jooq.DSLContext
99
import org.jooq.impl.DSL
1010

1111
/** Database object for interacting with a Jooq connection. */
12-
open class Database(private val dslContext: DSLContext?) {
12+
open class Database(protected val dslContext: DSLContext) {
1313
@Throws(SQLException::class)
1414
open fun <T> query(transform: ContextQueryFunction<T>): T? {
1515
return transform.query(dslContext)
1616
}
1717

1818
@Throws(SQLException::class)
1919
open fun <T> transaction(transform: ContextQueryFunction<T>): T? {
20-
return dslContext!!.transactionResult { configuration: Configuration? ->
20+
return dslContext.transactionResult { configuration: Configuration ->
2121
transform.query(DSL.using(configuration))
2222
}
2323
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/SqlDatabase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ abstract class SqlDatabase : AbstractDatabase() {
1010
@Throws(Exception::class) abstract fun execute(sql: String?)
1111

1212
@Throws(Exception::class)
13-
abstract fun unsafeQuery(sql: String?, vararg params: String?): Stream<JsonNode>
13+
abstract fun unsafeQuery(sql: String?, vararg params: String): Stream<JsonNode>
1414
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
175175
columnName,
176176
DataTypeUtils.returnNullIfInvalid(
177177
{ resultSet.getDouble(index) },
178-
{ d: Double? -> java.lang.Double.isFinite(d!!) },
178+
{ d: Double -> java.lang.Double.isFinite(d) },
179179
),
180180
)
181181
}
@@ -191,7 +191,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
191191
columnName,
192192
DataTypeUtils.returnNullIfInvalid(
193193
{ resultSet.getFloat(index) },
194-
{ f: Float? -> java.lang.Float.isFinite(f!!) },
194+
{ f: Float -> java.lang.Float.isFinite(f) },
195195
),
196196
)
197197
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ constructor(
8080
}
8181
}
8282

83-
override fun <T> executeMetadataQuery(query: Function<DatabaseMetaData?, T>): T {
83+
override fun <T> executeMetadataQuery(query: Function<DatabaseMetaData, T>): T {
8484
try {
8585
dataSource.connection.use { connection ->
8686
val metaData = connection.metaData

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
172172
*/
173173
@MustBeClosed
174174
@Throws(SQLException::class)
175-
override fun unsafeQuery(sql: String?, vararg params: String?): Stream<JsonNode> {
175+
override fun unsafeQuery(sql: String?, vararg params: String): Stream<JsonNode> {
176176
return unsafeQuery(
177177
{ connection: Connection ->
178178
val statement = connection.prepareStatement(sql)
@@ -192,7 +192,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
192192
* syntactic sugar.
193193
*/
194194
@Throws(SQLException::class)
195-
fun queryJsons(sql: String?, vararg params: String?): List<JsonNode> {
195+
fun queryJsons(sql: String?, vararg params: String): List<JsonNode> {
196196
unsafeQuery(sql, *params).use { stream ->
197197
return stream.toList()
198198
}
@@ -212,7 +212,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
212212
@get:Throws(SQLException::class) abstract val metaData: DatabaseMetaData
213213

214214
@Throws(SQLException::class)
215-
abstract fun <T> executeMetadataQuery(query: Function<DatabaseMetaData?, T>): T
215+
abstract fun <T> executeMetadataQuery(query: Function<DatabaseMetaData, T>): T
216216

217217
companion object {
218218
private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDatabase::class.java)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ object AirbyteTraceMessageUtility {
9090
// Not sure why defaultOutputRecordCollector is under Destination specifically,
9191
// but this matches usage elsewhere in base-java
9292
val outputRecordCollector =
93-
Consumer<AirbyteMessage> { m: AirbyteMessage? ->
93+
Consumer<AirbyteMessage> { m: AirbyteMessage ->
9494
Destination.Companion.defaultOutputRecordCollector(m)
9595
}
9696
outputRecordCollector.accept(message)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ internal constructor(
175175
val catalog =
176176
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!
177177
val stateOptional =
178-
parsed.getStatePath().map { path: Path? -> parseConfig(path) }
178+
parsed.getStatePath().map { path: Path -> parseConfig(path) }
179179
try {
180180
if (featureFlags.concurrentSourceStreamRead()) {
181181
LOGGER.info("Concurrent source stream read enabled.")
@@ -271,11 +271,11 @@ internal constructor(
271271
messageIterator: AutoCloseableIterator<AirbyteMessage>,
272272
recordCollector: Consumer<AirbyteMessage>
273273
) {
274-
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
274+
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
275275
LOGGER.debug("Producing messages for stream {}...", s)
276276
}
277277
messageIterator.forEachRemaining(recordCollector)
278-
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
278+
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
279279
LOGGER.debug("Finished producing messages for stream {}...", s)
280280
}
281281
}
@@ -352,7 +352,7 @@ internal constructor(
352352
)
353353
produceMessages(stream, streamStatusTrackingRecordConsumer)
354354
} catch (e: Exception) {
355-
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
355+
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
356356
LOGGER.error("Failed to consume from stream {}.", s, e)
357357
}
358358
throw RuntimeException(e)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshTunnel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ constructor(
519519
portKey: List<String>,
520520
wrapped: CheckedConsumer<JsonNode?, Exception?>
521521
) {
522-
sshWrap<Any?>(config, hostKey, portKey) { configInTunnel: JsonNode? ->
522+
sshWrap<Any?>(config, hostKey, portKey) { configInTunnel: JsonNode ->
523523
wrapped.accept(configInTunnel)
524524
null
525525
}
@@ -532,7 +532,7 @@ constructor(
532532
endPointKey: String,
533533
wrapped: CheckedConsumer<JsonNode?, Exception?>
534534
) {
535-
sshWrap<Any?>(config, endPointKey) { configInTunnel: JsonNode? ->
535+
sshWrap<Any?>(config, endPointKey) { configInTunnel: JsonNode ->
536536
wrapped.accept(configInTunnel)
537537
null
538538
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/CheckAndRemoveRecordWriter.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ fun interface CheckAndRemoveRecordWriter {
1212
* of the new file where the record will be sent will be returned.
1313
*/
1414
@Throws(Exception::class)
15-
fun apply(stream: AirbyteStreamNameNamespacePair?, stagingFileName: String?): String?
15+
fun apply(stream: AirbyteStreamNameNamespacePair, stagingFileName: String?): String?
1616
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ interface StreamCopier {
6262
* @return A string that unqiuely identifies the file. E.g. the filename, or a unique suffix
6363
* that is appended to a shared filename prefix
6464
*/
65-
fun prepareStagingFile(): String?
65+
fun prepareStagingFile(): String
6666

6767
/** @return current staging file name */
6868
val currentFile: String?

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopierFactory.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ interface StreamCopierFactory<T> {
1717
nameTransformer: StandardNameTransformer?,
1818
db: JdbcDatabase?,
1919
sqlOperations: SqlOperations?
20-
): StreamCopier?
20+
): StreamCopier
2121

2222
companion object {
2323
@JvmStatic

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/NormalizationLogParser.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ import org.apache.logging.log4j.util.Strings
2929
class NormalizationLogParser {
3030
val dbtErrors: MutableList<String> = ArrayList()
3131

32-
fun create(bufferedReader: BufferedReader): Stream<AirbyteMessage?> {
32+
fun create(bufferedReader: BufferedReader): Stream<AirbyteMessage> {
3333
return bufferedReader.lines().flatMap { line: String -> this.toMessages(line) }
3434
}
3535

3636
@VisibleForTesting
37-
fun toMessages(line: String): Stream<AirbyteMessage?> {
37+
fun toMessages(line: String): Stream<AirbyteMessage> {
3838
if (Strings.isEmpty(line)) {
3939
return Stream.of(logMessage(AirbyteLogMessage.Level.INFO, ""))
4040
}
@@ -51,7 +51,7 @@ class NormalizationLogParser {
5151
*
5252
* This is needed for dbt < 1.0.0, which don't support json-format logs.
5353
*/
54-
private fun nonJsonLineToMessage(line: String): Stream<AirbyteMessage?> {
54+
private fun nonJsonLineToMessage(line: String): Stream<AirbyteMessage> {
5555
// Super hacky thing to try and detect error lines
5656
if (line.contains("[error]")) {
5757
dbtErrors.add(line)
@@ -64,7 +64,7 @@ class NormalizationLogParser {
6464
* emit it without change), or it's dbt json log, and we need to do some extra work to convert
6565
* it to a log message + aggregate error logs.
6666
*/
67-
private fun jsonToMessage(jsonLine: JsonNode): Stream<AirbyteMessage?> {
67+
private fun jsonToMessage(jsonLine: JsonNode): Stream<AirbyteMessage> {
6868
val message = Jsons.tryObject(jsonLine, AirbyteMessage::class.java)
6969
if (message.isPresent) {
7070
// This line is already an AirbyteMessage; we can just return it directly
@@ -117,7 +117,7 @@ class NormalizationLogParser {
117117
normalizationLogParser.create(
118118
BufferedReader(InputStreamReader(System.`in`, StandardCharsets.UTF_8))
119119
)
120-
airbyteMessageStream.forEachOrdered { message: AirbyteMessage? ->
120+
airbyteMessageStream.forEachOrdered { message: AirbyteMessage ->
121121
println(Jsons.serialize(message))
122122
}
123123

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class InMemoryRecordBufferingStrategy(
5252
}
5353

5454
val bufferedRecords =
55-
streamBuffer.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair? ->
55+
streamBuffer.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair ->
5656
ArrayList()
5757
}
5858
bufferedRecords.add(message.record)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategy.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class SerializedBufferingStrategy
8989
* computed buffer
9090
*/
9191
private fun getOrCreateBuffer(stream: AirbyteStreamNameNamespacePair): SerializableBuffer {
92-
return allBuffers.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair? ->
92+
return allBuffers.computeIfAbsent(stream) { _: AirbyteStreamNameNamespacePair ->
9393
LOGGER.info(
9494
"Starting a new buffer for stream {} (current state: {} in {} buffers)",
9595
stream.name,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ object ConnectorExceptionUtil {
113113
val stacktraces =
114114
throwables
115115
.stream()
116-
.map { throwable: Throwable? -> ExceptionUtils.getStackTrace(throwable) }
116+
.map { throwable: Throwable -> ExceptionUtils.getStackTrace(throwable) }
117117
.collect(Collectors.joining("\n"))
118118
LOGGER.error("$initialMessage$stacktraces\nRethrowing first exception.")
119119
throw throwables.iterator().next()

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -181,18 +181,18 @@ class ConcurrentStreamConsumer(
181181
private fun executeStream(stream: AutoCloseableIterator<AirbyteMessage>) {
182182
try {
183183
stream.use {
184-
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
184+
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
185185
LOGGER.debug("Consuming from stream {}...", s)
186186
}
187187
StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter)
188188
streamConsumer.accept(stream)
189189
StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter)
190-
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
190+
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
191191
LOGGER.debug("Consumption from stream {} complete.", s)
192192
}
193193
}
194194
} catch (e: Exception) {
195-
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
195+
stream.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair ->
196196
LOGGER.error("Unable to consume from stream {}.", s, e)
197197
}
198198
StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter)

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/factory/CommonFactoryTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ internal open class CommonFactoryTest {
2020
@BeforeAll
2121
fun dbSetup(): Unit {
2222
container.withDatabaseName(DATABASE_NAME).withUsername("docker").withPassword("docker")
23-
container!!.start()
23+
container.start()
2424
}
2525

2626
@JvmStatic
2727
@AfterAll
2828
fun dbDown(): Unit {
29-
container!!.close()
29+
container.close()
3030
}
3131
}
3232
}

0 commit comments

Comments
 (0)