Skip to content

Commit d82639c

Browse files
remove nullable from generics (#37555)
## What <!-- * Describe what the change is solving. Link all GitHub issues related to this change. --> ## How <!-- * Describe how code changes achieve the solution. --> ## Review guide <!-- 1. `x.py` 2. `y.py` --> ## User Impact <!-- * What is the end result perceived by the user? * If there are negative side effects, please list them. --> ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [ ] YES 💚 - [ ] NO ❌
1 parent b488213 commit d82639c

File tree

87 files changed

+230
-245
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+230
-245
lines changed

airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ abstract class AzureBlobStorageStreamCopier(
5050
@Suppress("DEPRECATION")
5151
@get:VisibleForTesting
5252
val tmpTableName: String = nameTransformer.getTmpTableName(streamName)
53-
protected val activeStagingWriterFileNames: MutableSet<String?> = HashSet()
54-
private val csvPrinters = HashMap<String?, CSVPrinter>()
55-
private val blobClients = HashMap<String?, AppendBlobClient>()
53+
protected val activeStagingWriterFileNames: MutableSet<String> = HashSet()
54+
private val csvPrinters = HashMap<String, CSVPrinter>()
55+
private val blobClients = HashMap<String, AppendBlobClient>()
5656
override var currentFile: String? = null
5757

5858
@Throws(Exception::class)
@@ -210,7 +210,7 @@ abstract class AzureBlobStorageStreamCopier(
210210
@Throws(Exception::class)
211211
override fun closeNonCurrentStagingFileWriters() {
212212
LOGGER.info("Begin closing non current file writers")
213-
val removedKeys: MutableSet<String?> = HashSet()
213+
val removedKeys: MutableSet<String> = HashSet()
214214
for (key in activeStagingWriterFileNames) {
215215
if (key != currentFile) {
216216
csvPrinters[key]!!.close()

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ constructor(
2626
sourceOperations: JdbcCompatibleSourceOperations<*>? = JdbcUtils.defaultSourceOperations
2727
) : JdbcDatabase(sourceOperations) {
2828
@Throws(SQLException::class)
29-
override fun execute(query: CheckedConsumer<Connection, SQLException?>) {
29+
override fun execute(query: CheckedConsumer<Connection, SQLException>) {
3030
dataSource.connection.use { connection -> query.accept(connection) }
3131
}
3232

3333
@Throws(SQLException::class)
3434
override fun <T> bufferedResultSetQuery(
35-
query: CheckedFunction<Connection, ResultSet, SQLException?>,
36-
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
35+
query: CheckedFunction<Connection, ResultSet, SQLException>,
36+
recordTransform: CheckedFunction<ResultSet, T, SQLException>
3737
): List<T> {
3838
dataSource.connection.use { connection ->
3939
toUnsafeStream<T>(query.apply(connection), recordTransform).use { results ->
@@ -45,8 +45,8 @@ constructor(
4545
@MustBeClosed
4646
@Throws(SQLException::class)
4747
override fun <T> unsafeResultSetQuery(
48-
query: CheckedFunction<Connection, ResultSet, SQLException?>,
49-
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
48+
query: CheckedFunction<Connection, ResultSet, SQLException>,
49+
recordTransform: CheckedFunction<ResultSet, T, SQLException>
5050
): Stream<T> {
5151
val connection = dataSource.connection
5252
return JdbcDatabase.Companion.toUnsafeStream<T>(query.apply(connection), recordTransform)
@@ -114,8 +114,8 @@ constructor(
114114
@MustBeClosed
115115
@Throws(SQLException::class)
116116
override fun <T> unsafeQuery(
117-
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
118-
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
117+
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
118+
recordTransform: CheckedFunction<ResultSet, T, SQLException>
119119
): Stream<T> {
120120
val connection = dataSource.connection
121121
return JdbcDatabase.Companion.toUnsafeStream<T>(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt

+13-13
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
3131
* @throws SQLException SQL related exceptions.
3232
*/
3333
@Throws(SQLException::class)
34-
abstract fun execute(query: CheckedConsumer<Connection, SQLException?>)
34+
abstract fun execute(query: CheckedConsumer<Connection, SQLException>)
3535

3636
@Throws(SQLException::class)
3737
override fun execute(sql: String?) {
3838
execute { connection: Connection -> connection.createStatement().execute(sql) }
3939
}
4040

4141
@Throws(SQLException::class)
42-
fun executeWithinTransaction(queries: List<String?>) {
42+
fun executeWithinTransaction(queries: List<String>) {
4343
execute { connection: Connection ->
4444
connection.autoCommit = false
4545
for (s in queries) {
@@ -67,8 +67,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
6767
*/
6868
@Throws(SQLException::class)
6969
abstract fun <T> bufferedResultSetQuery(
70-
query: CheckedFunction<Connection, ResultSet, SQLException?>,
71-
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
70+
query: CheckedFunction<Connection, ResultSet, SQLException>,
71+
recordTransform: CheckedFunction<ResultSet, T, SQLException>
7272
): List<T>
7373

7474
/**
@@ -89,8 +89,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
8989
@MustBeClosed
9090
@Throws(SQLException::class)
9191
abstract fun <T> unsafeResultSetQuery(
92-
query: CheckedFunction<Connection, ResultSet, SQLException?>,
93-
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
92+
query: CheckedFunction<Connection, ResultSet, SQLException>,
93+
recordTransform: CheckedFunction<ResultSet, T, SQLException>
9494
): Stream<T>
9595

9696
/**
@@ -99,8 +99,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
9999
*/
100100
@Throws(SQLException::class)
101101
fun queryStrings(
102-
query: CheckedFunction<Connection, ResultSet, SQLException?>,
103-
recordTransform: CheckedFunction<ResultSet, String, SQLException?>
102+
query: CheckedFunction<Connection, ResultSet, SQLException>,
103+
recordTransform: CheckedFunction<ResultSet, String, SQLException>
104104
): List<String> {
105105
unsafeResultSetQuery(query, recordTransform).use { stream ->
106106
return stream.toList()
@@ -126,8 +126,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
126126
@MustBeClosed
127127
@Throws(SQLException::class)
128128
abstract fun <T> unsafeQuery(
129-
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
130-
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
129+
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
130+
recordTransform: CheckedFunction<ResultSet, T, SQLException>
131131
): Stream<T>
132132

133133
/**
@@ -136,8 +136,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
136136
*/
137137
@Throws(SQLException::class)
138138
fun queryJsons(
139-
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
140-
recordTransform: CheckedFunction<ResultSet, JsonNode, SQLException?>
139+
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
140+
recordTransform: CheckedFunction<ResultSet, JsonNode, SQLException>
141141
): List<JsonNode> {
142142
unsafeQuery(statementCreator, recordTransform).use { stream ->
143143
return stream.toList()
@@ -229,7 +229,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
229229
@MustBeClosed
230230
fun <T> toUnsafeStream(
231231
resultSet: ResultSet,
232-
mapper: CheckedFunction<ResultSet, T, SQLException?>
232+
mapper: CheckedFunction<ResultSet, T, SQLException>
233233
): Stream<T> {
234234
return StreamSupport.stream(
235235
object : AbstractSpliterator<T>(Long.MAX_VALUE, ORDERED) {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/StreamingJdbcDatabase.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class StreamingJdbcDatabase(
4646
@MustBeClosed
4747
@Throws(SQLException::class)
4848
override fun <T> unsafeQuery(
49-
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException?>,
50-
recordTransform: CheckedFunction<ResultSet, T, SQLException?>
49+
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
50+
recordTransform: CheckedFunction<ResultSet, T, SQLException>
5151
): Stream<T> {
5252
try {
5353
val connection = dataSource.connection
@@ -79,7 +79,7 @@ class StreamingJdbcDatabase(
7979
*/
8080
protected fun <T> toUnsafeStream(
8181
resultSet: ResultSet,
82-
mapper: CheckedFunction<ResultSet, T, SQLException?>,
82+
mapper: CheckedFunction<ResultSet, T, SQLException>,
8383
streamingConfig: JdbcStreamingQueryConfig
8484
): Stream<T> {
8585
return StreamSupport.stream(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
111111
* 1. Contain the original exception message as the external message, and a mangled message
112112
* as the internal message.
113113
*/
114-
@VisibleForTesting val STRINGS_TO_DEINTERPOLATE: MutableSet<String?> = HashSet()
114+
@VisibleForTesting val STRINGS_TO_DEINTERPOLATE: MutableSet<String> = HashSet()
115115

116116
init {
117117
addCommonStringsToDeinterpolate()

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ internal constructor(
284284
private fun readConcurrent(
285285
config: JsonNode,
286286
catalog: ConfiguredAirbyteCatalog,
287-
stateOptional: Optional<JsonNode?>
287+
stateOptional: Optional<JsonNode>
288288
) {
289289
val streams = source!!.readStreams(config, catalog, stateOptional.orElse(null))
290290

@@ -327,7 +327,7 @@ internal constructor(
327327
private fun readSerial(
328328
config: JsonNode,
329329
catalog: ConfiguredAirbyteCatalog,
330-
stateOptional: Optional<JsonNode?>
330+
stateOptional: Optional<JsonNode>
331331
) {
332332
try {
333333
source!!.read(config, catalog, stateOptional.orElse(null)).use { messageIterator ->

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,13 @@ class BufferDequeue(
5858
val output: MutableList<StreamAwareQueue.MessageWithMeta> = LinkedList()
5959
while (queue!!.size() > 0) {
6060
val memoryItem:
61-
MemoryBoundedLinkedBlockingQueue.MemoryItem<
62-
StreamAwareQueue.MessageWithMeta?>? =
61+
MemoryBoundedLinkedBlockingQueue.MemoryItem<StreamAwareQueue.MessageWithMeta> =
6362
queue.peek().orElseThrow()
6463

6564
// otherwise pull records until we hit the memory limit.
66-
val newSize: Long = (memoryItem?.size ?: 0) + bytesRead.get()
65+
val newSize: Long = (memoryItem.size) + bytesRead.get()
6766
if (newSize <= optimalBytesToRead) {
68-
memoryItem?.size?.let { bytesRead.addAndGet(it) }
67+
memoryItem.size.let { bytesRead.addAndGet(it) }
6968
queue.poll()?.item?.let { output.add(it) }
7069
} else {
7170
break

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/MemoryBoundedLinkedBlockingQueue.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private val logger = KotlinLogging.logger {}
2828
* @param <E> type in the queue </E>
2929
*/
3030
class MemoryBoundedLinkedBlockingQueue<E>(maxMemoryUsage: Long) {
31-
private val hiddenQueue = HiddenQueue<E?>(maxMemoryUsage)
31+
private val hiddenQueue = HiddenQueue<E>(maxMemoryUsage)
3232

3333
val currentMemoryUsage: Long
3434
get() = hiddenQueue.currentMemoryUsage.get()
@@ -48,24 +48,24 @@ class MemoryBoundedLinkedBlockingQueue<E>(maxMemoryUsage: Long) {
4848
return hiddenQueue.offer(e, itemSizeInBytes)
4949
}
5050

51-
fun peek(): MemoryItem<E?>? {
51+
fun peek(): MemoryItem<E>? {
5252
return hiddenQueue.peek()
5353
}
5454

5555
@Throws(InterruptedException::class)
56-
fun take(): MemoryItem<E?> {
56+
fun take(): MemoryItem<E> {
5757
return hiddenQueue.take()
5858
}
5959

60-
fun poll(): MemoryItem<E?>? {
60+
fun poll(): MemoryItem<E>? {
6161
return hiddenQueue.poll()
6262
}
6363

6464
@Throws(InterruptedException::class)
6565
fun poll(
6666
timeout: Long,
6767
unit: TimeUnit,
68-
): MemoryItem<E?>? {
68+
): MemoryItem<E>? {
6969
return hiddenQueue.poll(timeout, unit)
7070
}
7171

@@ -78,7 +78,7 @@ class MemoryBoundedLinkedBlockingQueue<E>(maxMemoryUsage: Long) {
7878
*
7979
* @param <E> </E>
8080
*/
81-
private class HiddenQueue<E>(maxMemoryUsage: Long) : LinkedBlockingQueue<MemoryItem<E>?>() {
81+
private class HiddenQueue<E>(maxMemoryUsage: Long) : LinkedBlockingQueue<MemoryItem<E>>() {
8282
val currentMemoryUsage: AtomicLong = AtomicLong(0)
8383
val maxMemoryUsage: AtomicLong = AtomicLong(maxMemoryUsage)
8484

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/StreamAwareQueue.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class StreamAwareQueue(maxMemoryUsage: Long) {
3737
return Optional.ofNullable(timeOfLastMessage.get())
3838
}
3939

40-
fun peek(): Optional<MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?>> {
40+
fun peek(): Optional<MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta>> {
4141
return Optional.ofNullable(memoryAwareQueue.peek())
4242
}
4343

@@ -59,19 +59,19 @@ class StreamAwareQueue(maxMemoryUsage: Long) {
5959
}
6060

6161
@Throws(InterruptedException::class)
62-
fun take(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?> {
62+
fun take(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta> {
6363
return memoryAwareQueue.take()
6464
}
6565

66-
fun poll(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?>? {
66+
fun poll(): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta>? {
6767
return memoryAwareQueue.poll()
6868
}
6969

7070
@Throws(InterruptedException::class)
7171
fun poll(
7272
timeout: Long,
7373
unit: TimeUnit,
74-
): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta?>? {
74+
): MemoryBoundedLinkedBlockingQueue.MemoryItem<MessageWithMeta>? {
7575
return memoryAwareQueue.poll(timeout, unit)
7676
}
7777

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
154154
var bytesFlushed: Long = 0L
155155
logger.info { "Flushing states" }
156156
synchronized(lock) {
157-
for (entry: Map.Entry<StreamDescriptor, LinkedBlockingDeque<Long>?> in
157+
for (entry: Map.Entry<StreamDescriptor, LinkedBlockingDeque<Long>> in
158158
descToStateIdQ.entries) {
159159
// Remove all states with 0 counters.
160160
// Per-stream synchronized is required to make sure the state (at the head of the
@@ -196,7 +196,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
196196
bytesFlushed += oldestState.second
197197

198198
// cleanup
199-
entry.value!!.poll()
199+
entry.value.poll()
200200
stateIdToState.remove(oldestStateId)
201201
stateIdToCounter.remove(oldestStateId)
202202
stateIdToCounterForPopulatingDestinationStats.remove(oldestStateId)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ interface SqlOperations {
2929
* @throws Exception exception
3030
*/
3131
@Throws(Exception::class)
32-
fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?)
32+
fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String)
3333

3434
/**
3535
* Denotes whether the schema exists in destination database

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingOperations.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ interface StagingOperations : SqlOperations {
6464
schemaName: String?,
6565
stageName: String?,
6666
stagingPath: String?
67-
): String?
67+
): String
6868

6969
/**
7070
* Load the data stored in the stage area into a temporary table in the destination
@@ -80,7 +80,7 @@ interface StagingOperations : SqlOperations {
8080
database: JdbcDatabase?,
8181
stageName: String?,
8282
stagingPath: String?,
83-
stagedFiles: List<String?>?,
83+
stagedFiles: List<String>?,
8484
tableName: String?,
8585
schemaName: String?
8686
)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ApmTraceUtils.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object ApmTraceUtils {
3434
* @param tags A map of tags to be added to the currently active span.
3535
*/
3636
@JvmOverloads
37-
fun addTagsToTrace(tags: Map<String?, Any>, tagPrefix: String? = TAG_PREFIX) {
37+
fun addTagsToTrace(tags: Map<String, Any>, tagPrefix: String? = TAG_PREFIX) {
3838
addTagsToTrace(GlobalTracer.get().activeSpan(), tags, tagPrefix)
3939
}
4040

@@ -45,10 +45,10 @@ object ApmTraceUtils {
4545
* @param tags A map of tags to be added to the currently active span.
4646
* @param tagPrefix The prefix to be added to each custom tag name.
4747
*/
48-
fun addTagsToTrace(span: Span?, tags: Map<String?, Any>, tagPrefix: String?) {
48+
fun addTagsToTrace(span: Span?, tags: Map<String, Any>, tagPrefix: String?) {
4949
if (span != null) {
5050
tags.entries.forEach(
51-
Consumer { entry: Map.Entry<String?, Any> ->
51+
Consumer { entry: Map.Entry<String, Any> ->
5252
span.setTag(formatTag(entry.key, tagPrefix), entry.value.toString())
5353
}
5454
)
@@ -83,12 +83,12 @@ object ApmTraceUtils {
8383
*
8484
* @param tags A map of tags to be added to the root span.
8585
*/
86-
fun addTagsToRootSpan(tags: Map<String?, Any>) {
86+
fun addTagsToRootSpan(tags: Map<String, Any>) {
8787
val activeSpan = GlobalTracer.get().activeSpan()
8888
if (activeSpan is MutableSpan) {
8989
val localRootSpan = (activeSpan as MutableSpan).localRootSpan
9090
tags.entries.forEach(
91-
Consumer { entry: Map.Entry<String?, Any> ->
91+
Consumer { entry: Map.Entry<String, Any> ->
9292
localRootSpan.setTag(formatTag(entry.key, TAG_PREFIX), entry.value.toString())
9393
}
9494
)

0 commit comments

Comments
 (0)