Skip to content

Commit 93bb626

Browse files
allow connectors to be written in kotlin
1 parent 8f6036e commit 93bb626

File tree

16 files changed

+65
-92
lines changed

16 files changed

+65
-92
lines changed

airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle

-13
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,6 @@ plugins {
33
id "java-library"
44
}
55

6-
java {
7-
// TODO: rewrite code to avoid javac wornings in the first place
8-
compileJava {
9-
options.compilerArgs += "-Xlint:-varargs,-try,-deprecation,-unchecked,-this-escape"
10-
}
11-
compileTestJava {
12-
options.compilerArgs += "-Xlint:-try"
13-
}
14-
compileTestFixturesJava {
15-
options.compilerArgs += "-Xlint:-try"
16-
}
17-
}
18-
compileKotlin.compilerOptions.allWarningsAsErrors = false
196
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
207
compileTestKotlin.compilerOptions.allWarningsAsErrors = false
218

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/concurrency/CompletableFutures.kt

+6-9
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,18 @@ object CompletableFutures {
2424
val result = CompletableFuture<List<Either<out Exception, Result>>>()
2525
val size = futures.size
2626
val counter = AtomicInteger()
27-
val results =
28-
java.lang.reflect.Array.newInstance(Either::class.java, size)
29-
as Array<Either<Exception, Result>>
27+
val results = mutableListOf<Either<out Exception, Result>>()
3028
// attach a whenComplete to all futures
31-
for (i in 0 until size) {
32-
val currentIndex = i
33-
futures[i].whenComplete { value: Result, exception: Throwable? ->
29+
for (future in futures) {
30+
future.whenComplete { value: Result, exception: Throwable? ->
3431
// if exception is null, then the future completed successfully
3532
// maybe synchronization is unnecessary here, but it's better to be safe
3633
synchronized(results) {
3734
if (exception == null) {
38-
results[currentIndex] = Either.right(value)
35+
results.add(Either.right(value))
3936
} else {
4037
if (exception is Exception) {
41-
results[currentIndex] = Either.left(exception)
38+
results.add(Either.left(exception))
4239
} else {
4340
// this should never happen
4441
throw RuntimeException(
@@ -50,7 +47,7 @@ object CompletableFutures {
5047
}
5148
val completedCount = counter.incrementAndGet()
5249
if (completedCount == size) {
53-
result.complete(Arrays.asList(*results))
50+
result.complete(results)
5451
}
5552
}
5653
}

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import java.util.stream.Collectors
1515

1616
class Enums {
1717
companion object {
18+
@Suppress("UNUSED_PARAMETER")
1819
inline fun <T1 : Enum<T1>, reified T2 : Enum<T2>> convertTo(ie: T1?, oe: Class<T2>): T2? {
1920
if (ie == null) {
2021
return null

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagHelper.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ object FeatureFlagHelper {
4141
try {
4242
workspaceIds.add(UUID.fromString(id))
4343
} catch (e: IllegalArgumentException) {
44-
log.warn("Malformed workspace id for {}: {}", context, id)
44+
log.warn { "Malformed workspace id for $context: $id" }
4545
}
4646
}
4747
}

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/functional/Either.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ class Either<Error, Result> private constructor(left: Error?, right: Result?) {
2929
return right != null
3030
}
3131

32-
override fun equals(o: Any?): Boolean {
33-
if (this === o) {
32+
override fun equals(other: Any?): Boolean {
33+
if (this === other) {
3434
return true
3535
}
36-
if (o == null || javaClass != o.javaClass) {
36+
if (other == null || javaClass != other.javaClass) {
3737
return false
3838
}
39-
val either = o as Either<*, *>
39+
val either = other as Either<*, *>
4040
return left == either.left && right == either.right
4141
}
4242

@@ -47,7 +47,7 @@ class Either<Error, Result> private constructor(left: Error?, right: Result?) {
4747
companion object {
4848
fun <Error, Result> left(error: Error): Either<Error, Result> {
4949
if (error == null) {
50-
LOGGER.warn("Either.left called with a null!")
50+
LOGGER.warn { "Either.left called with a null!" }
5151
}
5252
return Either(error!!, null)
5353
}

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ object IOs {
8383
return emptyList<String>()
8484
}
8585

86-
ReversedLinesFileReader(file, Charsets.UTF_8).use { fileReader ->
86+
ReversedLinesFileReader.Builder().setFile(file).setCharset(Charsets.UTF_8).get().use {
87+
fileReader ->
8788
val lines: MutableList<String?> = ArrayList()
8889
var line = fileReader.readLine()
8990
while (line != null && lines.size < numLines) {

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/LineGobbler.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ internal constructor(
2727
private val caller: String = GENERIC,
2828
private val containerLogMdcBuilder: MdcScope.Builder = MdcScope.Companion.DEFAULT_BUILDER
2929
) : VoidCallable {
30-
private val `is`: BufferedReader? = IOs.newBufferedReader(`is`)
30+
private val `is`: BufferedReader = IOs.newBufferedReader(`is`)
3131

3232
internal constructor(
3333
`is`: InputStream,
@@ -40,9 +40,9 @@ internal constructor(
4040
override fun voidCall() {
4141
MDC.setContextMap(mdc)
4242
try {
43-
var line = `is`!!.readLine()
43+
var line = `is`.readLine()
4444
while (line != null) {
45-
containerLogMdcBuilder.build().use { mdcScope -> consumer.accept(line) }
45+
containerLogMdcBuilder.build().use { consumer.accept(line) }
4646
line = `is`.readLine()
4747
}
4848
} catch (i: IOException) {

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ object JsonPaths {
259259
* @param replacement
260260
* - a string value to replace the current value at the jsonPath
261261
*/
262-
fun replaceAtString(json: JsonNode, jsonPath: String, replacement: String): JsonNode? {
262+
fun replaceAtString(json: JsonNode, jsonPath: String, replacement: String): JsonNode {
263263
return replaceAtJsonNode(json, jsonPath, Jsons.jsonNode(replacement))
264264
}
265265

@@ -315,7 +315,7 @@ object JsonPaths {
315315
json: JsonNode,
316316
jsonPath: String,
317317
replacementFunction: BiFunction<JsonNode, String, JsonNode>
318-
): JsonNode? {
318+
): JsonNode {
319319
var clone = Jsons.clone(json)
320320
assertIsJsonPath(jsonPath)
321321
val foundPaths = getPaths(clone, jsonPath)

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,9 @@ object JsonSchemas {
226226
consumer
227227
)
228228
} else {
229-
log.warn(
229+
log.warn {
230230
"The array is missing an items field. The traversal is silently stopped. Current schema: $jsonSchemaNode"
231-
)
231+
}
232232
}
233233
}
234234
OBJECT_TYPE -> {
@@ -247,9 +247,9 @@ object JsonSchemas {
247247
traverseJsonSchemaInternal(arrayItem, path, consumer)
248248
}
249249
} else {
250-
log.warn(
250+
log.warn {
251251
"The object is a properties key or a combo keyword. The traversal is silently stopped. Current schema: $jsonSchemaNode"
252-
)
252+
}
253253
}
254254
}
255255
}
@@ -331,14 +331,14 @@ object JsonSchemas {
331331
class FieldNameOrList private constructor(val fieldName: String?) {
332332
val isList: Boolean = fieldName == null
333333

334-
override fun equals(o: Any?): Boolean {
335-
if (this === o) {
334+
override fun equals(other: Any?): Boolean {
335+
if (this === other) {
336336
return true
337337
}
338-
if (o !is FieldNameOrList) {
338+
if (other !is FieldNameOrList) {
339339
return false
340340
}
341-
val that = o
341+
val that = other
342342
return isList == that.isList && fieldName == that.fieldName
343343
}
344344

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt

+15-15
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ object Jsons {
221221

222222
@JvmStatic
223223
fun <T : Any> clone(o: T): T {
224-
return deserialize(serialize(o), o::class.java) as T
224+
return deserialize(serialize(o), o::class.java)
225225
}
226226

227227
fun toBytes(jsonNode: JsonNode): ByteArray {
@@ -262,16 +262,16 @@ object Jsons {
262262
}
263263

264264
fun navigateTo(node: JsonNode, keys: List<String?>): JsonNode {
265-
var node = node
265+
var targetNode = node
266266
for (key in keys) {
267-
node = node[key]
267+
targetNode = targetNode[key]
268268
}
269-
return node
269+
return targetNode
270270
}
271271

272272
fun replaceNestedValue(json: JsonNode, keys: List<String?>, replacement: JsonNode?) {
273273
replaceNested(json, keys) { node: ObjectNode, finalKey: String? ->
274-
node.put(finalKey, replacement)
274+
node.replace(finalKey, replacement)
275275
}
276276
}
277277

@@ -302,16 +302,16 @@ object Jsons {
302302
}
303303

304304
fun getOptional(json: JsonNode?, keys: List<String>): Optional<JsonNode> {
305-
var json = json
305+
var retVal = json
306306
for (key in keys) {
307-
if (json == null) {
307+
if (retVal == null) {
308308
return Optional.empty()
309309
}
310310

311-
json = json[key]
311+
retVal = retVal[key]
312312
}
313313

314-
return Optional.ofNullable(json)
314+
return Optional.ofNullable(retVal)
315315
}
316316

317317
fun getStringOrNull(json: JsonNode?, vararg keys: String): String? {
@@ -419,21 +419,21 @@ object Jsons {
419419
* the class name can at least help narrow down the problem, without leaking
420420
* potentially-sensitive information. </snip...>
421421
*/
422-
private fun <T : Any> handleDeserThrowable(t: Throwable): Optional<T> {
422+
private fun <T : Any> handleDeserThrowable(throwable: Throwable): Optional<T> {
423423
// Manually build the stacktrace, excluding the top-level exception object
424424
// so that we don't accidentally include the exception message.
425425
// Otherwise we could just do ExceptionUtils.getStackTrace(t).
426-
var t: Throwable? = t
426+
var t: Throwable = throwable
427427
val sb = StringBuilder()
428-
sb.append(t!!.javaClass)
428+
sb.append(t.javaClass)
429429
for (traceElement in t.stackTrace) {
430430
sb.append("\n\tat ")
431431
sb.append(traceElement.toString())
432432
}
433-
while (t!!.cause != null) {
434-
t = t.cause
433+
while (t.cause != null) {
434+
t = t.cause!!
435435
sb.append("\nCaused by ")
436-
sb.append(t!!.javaClass)
436+
sb.append(t.javaClass)
437437
for (traceElement in t.stackTrace) {
438438
sb.append("\n\tat ")
439439
sb.append(traceElement.toString())

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/stream/StreamStatusUtils.kt

+8-16
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ object StreamStatusUtils {
6262
airbyteStream: AutoCloseableIterator<AirbyteMessage>,
6363
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
6464
) {
65-
if (airbyteStream is AirbyteStreamAware) {
66-
emitRunningStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
67-
}
65+
emitRunningStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
6866
}
6967

7068
/**
@@ -90,7 +88,7 @@ object StreamStatusUtils {
9088
airbyteStream: Optional<AirbyteStreamNameNamespacePair>,
9189
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
9290
) {
93-
airbyteStream!!.ifPresent { s: AirbyteStreamNameNamespacePair? ->
91+
airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
9492
LOGGER.debug("RUNNING -> {}", s)
9593
emitStreamStatus(
9694
s,
@@ -110,9 +108,7 @@ object StreamStatusUtils {
110108
airbyteStream: AutoCloseableIterator<AirbyteMessage>,
111109
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
112110
) {
113-
if (airbyteStream is AirbyteStreamAware) {
114-
emitStartStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
115-
}
111+
emitStartStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
116112
}
117113

118114
/**
@@ -138,7 +134,7 @@ object StreamStatusUtils {
138134
airbyteStream: Optional<AirbyteStreamNameNamespacePair>,
139135
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
140136
) {
141-
airbyteStream!!.ifPresent { s: AirbyteStreamNameNamespacePair? ->
137+
airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
142138
LOGGER.debug("STARTING -> {}", s)
143139
emitStreamStatus(
144140
s,
@@ -158,9 +154,7 @@ object StreamStatusUtils {
158154
airbyteStream: AutoCloseableIterator<AirbyteMessage>,
159155
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
160156
) {
161-
if (airbyteStream is AirbyteStreamAware) {
162-
emitCompleteStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
163-
}
157+
emitCompleteStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
164158
}
165159

166160
/**
@@ -186,7 +180,7 @@ object StreamStatusUtils {
186180
airbyteStream: Optional<AirbyteStreamNameNamespacePair>,
187181
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
188182
) {
189-
airbyteStream!!.ifPresent { s: AirbyteStreamNameNamespacePair? ->
183+
airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
190184
LOGGER.debug("COMPLETE -> {}", s)
191185
emitStreamStatus(
192186
s,
@@ -206,9 +200,7 @@ object StreamStatusUtils {
206200
airbyteStream: AutoCloseableIterator<AirbyteMessage>,
207201
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
208202
) {
209-
if (airbyteStream is AirbyteStreamAware) {
210-
emitIncompleteStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
211-
}
203+
emitIncompleteStreamStatus(airbyteStream as AirbyteStreamAware, statusEmitter)
212204
}
213205

214206
/**
@@ -234,7 +226,7 @@ object StreamStatusUtils {
234226
airbyteStream: Optional<AirbyteStreamNameNamespacePair>,
235227
statusEmitter: Optional<Consumer<AirbyteStreamStatusHolder>>
236228
) {
237-
airbyteStream!!.ifPresent { s: AirbyteStreamNameNamespacePair? ->
229+
airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
238230
LOGGER.debug("INCOMPLETE -> {}", s)
239231
emitStreamStatus(
240232
s,

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/CompositeIterator.kt

+2-7
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ internal constructor(
108108
private fun emitStartStreamStatus(
109109
airbyteStream: Optional<AirbyteStreamNameNamespacePair>
110110
): Boolean {
111-
if (airbyteStream!!.isPresent && !seenIterators.contains(airbyteStream)) {
111+
if (airbyteStream.isPresent && !seenIterators.contains(airbyteStream)) {
112112
seenIterators.add(airbyteStream)
113113
StreamStatusUtils.emitStartStreamStatus(airbyteStream, airbyteStreamStatusConsumer)
114114
return true
@@ -136,12 +136,7 @@ internal constructor(
136136
}
137137

138138
override val airbyteStream: Optional<AirbyteStreamNameNamespacePair>
139-
get() =
140-
if (currentIterator() is AirbyteStreamAware) {
141-
AirbyteStreamAware::class.java.cast(currentIterator()).airbyteStream
142-
} else {
143-
Optional.empty()
144-
}
139+
get() = AirbyteStreamAware::class.java.cast(currentIterator()).airbyteStream
145140

146141
private fun assertHasNotClosed() {
147142
Preconditions.checkState(!hasClosed)

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/version/Version.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ open class Version {
138138
'}'
139139
}
140140

141-
override fun equals(o: Any?): Boolean {
142-
if (this === o) {
141+
override fun equals(other: Any?): Boolean {
142+
if (this === other) {
143143
return true
144144
}
145-
if (o == null || javaClass != o.javaClass) {
145+
if (other == null || javaClass != other.javaClass) {
146146
return false
147147
}
148-
val that = o as Version
148+
val that = other as Version
149149
return version == that.version &&
150150
major == that.major &&
151151
minor == that.minor &&

0 commit comments

Comments
 (0)