Skip to content

Commit 2e6d9f1

Browse files
fix compiler errors
1 parent e7d969d commit 2e6d9f1

File tree

83 files changed

+1576
-1056
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+1576
-1056
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/Destination.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ interface Destination : Integration {
3232
*/
3333
@Throws(Exception::class)
3434
fun getConsumer(
35-
config: JsonNode?,
36-
catalog: ConfiguredAirbyteCatalog?,
35+
config: JsonNode,
36+
catalog: ConfiguredAirbyteCatalog,
3737
outputRecordCollector: Consumer<AirbyteMessage?>?
3838
): AirbyteMessageConsumer?
3939

@@ -51,8 +51,8 @@ interface Destination : Integration {
5151
*/
5252
@Throws(Exception::class)
5353
fun getSerializedMessageConsumer(
54-
config: JsonNode?,
55-
catalog: ConfiguredAirbyteCatalog?,
54+
config: JsonNode,
55+
catalog: ConfiguredAirbyteCatalog,
5656
outputRecordCollector: Consumer<AirbyteMessage?>?
5757
): SerializedAirbyteMessageConsumer? {
5858
return ShimToSerializedAirbyteMessageConsumer(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/Integration.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ interface Integration {
2828
* @throws Exception
2929
* - any exception.
3030
*/
31-
@Throws(Exception::class) fun check(config: JsonNode?): AirbyteConnectionStatus?
31+
@Throws(Exception::class) fun check(config: JsonNode): AirbyteConnectionStatus?
3232
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/Source.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ interface Source : Integration {
2121
* @throws Exception
2222
* - any exception.
2323
*/
24-
@Throws(Exception::class) fun discover(config: JsonNode?): AirbyteCatalog?
24+
@Throws(Exception::class) fun discover(config: JsonNode): AirbyteCatalog?
2525

2626
/**
2727
* Return a iterator of messages pulled from the source.
@@ -41,7 +41,7 @@ interface Source : Integration {
4141
*/
4242
@Throws(Exception::class)
4343
fun read(
44-
config: JsonNode?,
44+
config: JsonNode,
4545
catalog: ConfiguredAirbyteCatalog?,
4646
state: JsonNode?
4747
): AutoCloseableIterator<AirbyteMessage>
@@ -64,7 +64,7 @@ interface Source : Integration {
6464
*/
6565
@Throws(Exception::class)
6666
fun readStreams(
67-
config: JsonNode?,
67+
config: JsonNode,
6868
catalog: ConfiguredAirbyteCatalog?,
6969
state: JsonNode?
7070
): Collection<AutoCloseableIterator<AirbyteMessage>>? {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,23 @@ abstract class SpecModifyingDestination(private val destination: Destination) :
2323
}
2424

2525
@Throws(Exception::class)
26-
override fun check(config: JsonNode?): AirbyteConnectionStatus? {
26+
override fun check(config: JsonNode): AirbyteConnectionStatus? {
2727
return destination.check(config)
2828
}
2929

3030
@Throws(Exception::class)
3131
override fun getConsumer(
32-
config: JsonNode?,
33-
catalog: ConfiguredAirbyteCatalog?,
32+
config: JsonNode,
33+
catalog: ConfiguredAirbyteCatalog,
3434
outputRecordCollector: Consumer<AirbyteMessage?>?
3535
): AirbyteMessageConsumer? {
3636
return destination.getConsumer(config, catalog, outputRecordCollector)
3737
}
3838

3939
@Throws(Exception::class)
4040
override fun getSerializedMessageConsumer(
41-
config: JsonNode?,
42-
catalog: ConfiguredAirbyteCatalog?,
41+
config: JsonNode,
42+
catalog: ConfiguredAirbyteCatalog,
4343
outputRecordCollector: Consumer<AirbyteMessage?>?
4444
): SerializedAirbyteMessageConsumer? {
4545
return destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingSource.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
2323
}
2424

2525
@Throws(Exception::class)
26-
override fun check(config: JsonNode?): AirbyteConnectionStatus? {
26+
override fun check(config: JsonNode): AirbyteConnectionStatus? {
2727
return source.check(config)
2828
}
2929

3030
@Throws(Exception::class)
31-
override fun discover(config: JsonNode?): AirbyteCatalog? {
31+
override fun discover(config: JsonNode): AirbyteCatalog? {
3232
return source.discover(config)
3333
}
3434

3535
@Throws(Exception::class)
3636
override fun read(
37-
config: JsonNode?,
37+
config: JsonNode,
3838
catalog: ConfiguredAirbyteCatalog?,
3939
state: JsonNode?
4040
): AutoCloseableIterator<AirbyteMessage> {
@@ -43,7 +43,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
4343

4444
@Throws(Exception::class)
4545
override fun readStreams(
46-
config: JsonNode?,
46+
config: JsonNode,
4747
catalog: ConfiguredAirbyteCatalog?,
4848
state: JsonNode?
4949
): Collection<AutoCloseableIterator<AirbyteMessage>>? {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshTunnel.kt

+10-10
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory
4141
open class SshTunnel
4242
@JvmOverloads
4343
constructor(
44-
val originalConfig: JsonNode?,
44+
val originalConfig: JsonNode,
4545
private val hostKey: List<String>?,
4646
private val portKey: List<String>?,
4747
private val endPointKey: String?,
@@ -196,7 +196,7 @@ constructor(
196196
}
197197

198198
@get:Throws(Exception::class)
199-
val configInTunnel: JsonNode?
199+
val configInTunnel: JsonNode
200200
get() {
201201
if (tunnelMethod == TunnelMethod.NO_TUNNEL) {
202202
return originalConfig
@@ -423,7 +423,7 @@ constructor(
423423
const val TIMEOUT_MILLIS: Int = 15000 // 15 seconds
424424

425425
fun getInstance(
426-
config: JsonNode?,
426+
config: JsonNode,
427427
hostKey: List<String>?,
428428
portKey: List<String>?
429429
): SshTunnel {
@@ -490,7 +490,7 @@ constructor(
490490
}
491491

492492
@Throws(Exception::class)
493-
fun getInstance(config: JsonNode?, endPointKey: String?): SshTunnel {
493+
fun getInstance(config: JsonNode, endPointKey: String?): SshTunnel {
494494
val tunnelMethod =
495495
Jsons.getOptional(config, "tunnel_method", "tunnel_method")
496496
.map { method: JsonNode ->
@@ -521,7 +521,7 @@ constructor(
521521

522522
@Throws(Exception::class)
523523
fun sshWrap(
524-
config: JsonNode?,
524+
config: JsonNode,
525525
hostKey: List<String>?,
526526
portKey: List<String>?,
527527
wrapped: CheckedConsumer<JsonNode?, Exception?>
@@ -534,7 +534,7 @@ constructor(
534534

535535
@Throws(Exception::class)
536536
fun sshWrap(
537-
config: JsonNode?,
537+
config: JsonNode,
538538
endPointKey: String?,
539539
wrapped: CheckedConsumer<JsonNode?, Exception?>
540540
) {
@@ -546,10 +546,10 @@ constructor(
546546

547547
@Throws(Exception::class)
548548
fun <T> sshWrap(
549-
config: JsonNode?,
549+
config: JsonNode,
550550
hostKey: List<String>?,
551551
portKey: List<String>?,
552-
wrapped: CheckedFunction<JsonNode?, T, Exception?>
552+
wrapped: CheckedFunction<JsonNode, T, Exception?>
553553
): T {
554554
getInstance(config, hostKey, portKey).use { sshTunnel ->
555555
return wrapped.apply(sshTunnel.configInTunnel)
@@ -558,9 +558,9 @@ constructor(
558558

559559
@Throws(Exception::class)
560560
fun <T> sshWrap(
561-
config: JsonNode?,
561+
config: JsonNode,
562562
endPointKey: String?,
563-
wrapped: CheckedFunction<JsonNode?, T, Exception?>
563+
wrapped: CheckedFunction<JsonNode, T, Exception?>
564564
): T {
565565
getInstance(config, endPointKey).use { sshTunnel ->
566566
return wrapped.apply(sshTunnel.configInTunnel)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.kt

+10-10
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ class SshWrappedDestination : Destination {
5959
}
6060

6161
@Throws(Exception::class)
62-
override fun check(config: JsonNode?): AirbyteConnectionStatus? {
62+
override fun check(config: JsonNode): AirbyteConnectionStatus? {
6363
try {
6464
return if ((endPointKey != null))
6565
SshTunnel.Companion.sshWrap<AirbyteConnectionStatus?>(
6666
config,
6767
endPointKey,
68-
CheckedFunction<JsonNode?, AirbyteConnectionStatus?, Exception?> {
69-
config: JsonNode? ->
68+
CheckedFunction<JsonNode, AirbyteConnectionStatus?, Exception?> {
69+
config: JsonNode ->
7070
delegate.check(config)
7171
}
7272
)
@@ -75,8 +75,8 @@ class SshWrappedDestination : Destination {
7575
config,
7676
hostKey,
7777
portKey,
78-
CheckedFunction<JsonNode?, AirbyteConnectionStatus?, Exception?> {
79-
config: JsonNode? ->
78+
CheckedFunction<JsonNode, AirbyteConnectionStatus?, Exception?> {
79+
config: JsonNode ->
8080
delegate.check(config)
8181
}
8282
)
@@ -92,8 +92,8 @@ class SshWrappedDestination : Destination {
9292

9393
@Throws(Exception::class)
9494
override fun getConsumer(
95-
config: JsonNode?,
96-
catalog: ConfiguredAirbyteCatalog?,
95+
config: JsonNode,
96+
catalog: ConfiguredAirbyteCatalog,
9797
outputRecordCollector: Consumer<AirbyteMessage?>?
9898
): AirbyteMessageConsumer? {
9999
val tunnel = getTunnelInstance(config)
@@ -118,8 +118,8 @@ class SshWrappedDestination : Destination {
118118

119119
@Throws(Exception::class)
120120
override fun getSerializedMessageConsumer(
121-
config: JsonNode?,
122-
catalog: ConfiguredAirbyteCatalog?,
121+
config: JsonNode,
122+
catalog: ConfiguredAirbyteCatalog,
123123
outputRecordCollector: Consumer<AirbyteMessage?>?
124124
): SerializedAirbyteMessageConsumer? {
125125
val clone = Jsons.clone(config)
@@ -163,7 +163,7 @@ class SshWrappedDestination : Destination {
163163
}
164164

165165
@Throws(Exception::class)
166-
protected fun getTunnelInstance(config: JsonNode?): SshTunnel {
166+
protected fun getTunnelInstance(config: JsonNode): SshTunnel {
167167
return if ((endPointKey != null)) SshTunnel.Companion.getInstance(config, endPointKey)
168168
else SshTunnel.Companion.getInstance(config, hostKey, portKey)
169169
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshWrappedSource.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ class SshWrappedSource : Source {
4040
}
4141

4242
@Throws(Exception::class)
43-
override fun check(config: JsonNode?): AirbyteConnectionStatus? {
43+
override fun check(config: JsonNode): AirbyteConnectionStatus? {
4444
try {
4545
return SshTunnel.Companion.sshWrap<AirbyteConnectionStatus?>(
4646
config,
4747
hostKey,
4848
portKey,
49-
CheckedFunction<JsonNode?, AirbyteConnectionStatus?, Exception?> { config: JsonNode?
49+
CheckedFunction<JsonNode, AirbyteConnectionStatus?, Exception?> { config: JsonNode
5050
->
5151
delegate.check(config)
5252
}
@@ -62,20 +62,20 @@ class SshWrappedSource : Source {
6262
}
6363

6464
@Throws(Exception::class)
65-
override fun discover(config: JsonNode?): AirbyteCatalog? {
65+
override fun discover(config: JsonNode): AirbyteCatalog? {
6666
return SshTunnel.Companion.sshWrap<AirbyteCatalog?>(
6767
config,
6868
hostKey,
6969
portKey,
70-
CheckedFunction<JsonNode?, AirbyteCatalog?, Exception?> { config: JsonNode? ->
70+
CheckedFunction<JsonNode, AirbyteCatalog?, Exception?> { config: JsonNode ->
7171
delegate.discover(config)
7272
}
7373
)
7474
}
7575

7676
@Throws(Exception::class)
7777
override fun read(
78-
config: JsonNode?,
78+
config: JsonNode,
7979
catalog: ConfiguredAirbyteCatalog?,
8080
state: JsonNode?
8181
): AutoCloseableIterator<AirbyteMessage> {
@@ -96,7 +96,7 @@ class SshWrappedSource : Source {
9696

9797
@Throws(Exception::class)
9898
override fun readStreams(
99-
config: JsonNode?,
99+
config: JsonNode,
100100
catalog: ConfiguredAirbyteCatalog?,
101101
state: JsonNode?
102102
): Collection<AutoCloseableIterator<AirbyteMessage>>? {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory
4646
class AsyncStreamConsumer
4747
@VisibleForTesting
4848
constructor(
49-
outputRecordCollector: Consumer<AirbyteMessage?>,
49+
outputRecordCollector: Consumer<AirbyteMessage>,
5050
private val onStart: OnStartFunction,
5151
private val onClose: OnCloseFunction,
5252
flusher: DestinationFlushFunction,
@@ -89,7 +89,7 @@ constructor(
8989
private val PARTIAL_DESERIALIZE_REF_BYTES: Int = 10 * 8
9090

9191
constructor(
92-
outputRecordCollector: Consumer<AirbyteMessage?>,
92+
outputRecordCollector: Consumer<AirbyteMessage>,
9393
onStart: OnStartFunction,
9494
onClose: OnCloseFunction,
9595
flusher: DestinationFlushFunction,
@@ -108,7 +108,7 @@ constructor(
108108
)
109109

110110
constructor(
111-
outputRecordCollector: Consumer<AirbyteMessage?>,
111+
outputRecordCollector: Consumer<AirbyteMessage>,
112112
onStart: OnStartFunction,
113113
onClose: OnCloseFunction,
114114
flusher: DestinationFlushFunction,
@@ -131,7 +131,7 @@ constructor(
131131
)
132132

133133
constructor(
134-
outputRecordCollector: Consumer<AirbyteMessage?>,
134+
outputRecordCollector: Consumer<AirbyteMessage>,
135135
onStart: OnStartFunction,
136136
onClose: OnCloseFunction,
137137
flusher: DestinationFlushFunction,
@@ -155,7 +155,7 @@ constructor(
155155

156156
@VisibleForTesting
157157
constructor(
158-
outputRecordCollector: Consumer<AirbyteMessage?>,
158+
outputRecordCollector: Consumer<AirbyteMessage>,
159159
onStart: OnStartFunction,
160160
onClose: OnCloseFunction,
161161
flusher: DestinationFlushFunction,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/FlushWorkers.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class FlushWorkers
4848
constructor(
4949
private val bufferDequeue: io.airbyte.cdk.integrations.destination.async.buffers.BufferDequeue,
5050
private val flusher: DestinationFlushFunction,
51-
private val outputRecordCollector: Consumer<AirbyteMessage?>,
51+
private val outputRecordCollector: Consumer<AirbyteMessage>,
5252
private val flushFailure: FlushFailure,
5353
private val stateManager: GlobalAsyncStateManager,
5454
private val workerPool: ExecutorService = Executors.newFixedThreadPool(5),

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/MemoryAwareMessageBatch.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class MemoryAwareMessageBatch(
3939
*/
4040
fun flushStates(
4141
stateIdToCount: Map<Long?, Long?>,
42-
outputRecordCollector: Consumer<AirbyteMessage?>,
42+
outputRecordCollector: Consumer<AirbyteMessage>,
4343
) {
4444
stateIdToCount.forEach { (stateId: Long?, count: Long?) ->
4545
stateManager.decrement(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
154154
* Intended to be called by [io.airbyte.cdk.integrations.destination.async.FlushWorkers] after a
155155
* worker has finished flushing its record batch.
156156
*/
157-
fun flushStates(outputRecordCollector: Consumer<AirbyteMessage?>) {
157+
fun flushStates(outputRecordCollector: Consumer<AirbyteMessage>) {
158158
var bytesFlushed: Long = 0L
159159
logger.info { "Flushing states" }
160160
synchronized(lock) {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
1515
* The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map
1616
* will be treated as equivalent to [StreamSyncSummary.DEFAULT].
1717
*/
18-
interface OnCloseFunction :
18+
fun interface OnCloseFunction :
1919
CheckedBiConsumer<Boolean, Map<StreamDescriptor, StreamSyncSummary>, Exception>

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnStartFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ package io.airbyte.cdk.integrations.destination.buffered_stream_consumer
66

77
import io.airbyte.commons.concurrency.VoidCallable
88

9-
interface OnStartFunction : VoidCallable
9+
fun interface OnStartFunction : VoidCallable

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/FlushBufferFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.record_buffer
66
import io.airbyte.commons.functional.CheckedBiConsumer
77
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
88

9-
interface FlushBufferFunction :
9+
fun interface FlushBufferFunction :
1010
CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> {
1111
@Throws(Exception::class)
1212
override fun accept(stream: AirbyteStreamNameNamespacePair, buffer: SerializableBuffer)

0 commit comments

Comments
 (0)