Skip to content

Commit b488213

Browse files
remove unnecessary calls to toList() (#37540)
## What <!-- * Describe what the change is solving. Link all GitHub issues related to this change. --> ## How <!-- * Describe how code changes achieve the solution. --> ## Review guide <!-- 1. `x.py` 2. `y.py` --> ## User Impact <!-- * What is the end result perceived by the user? * If there are negative side effects, please list them. --> ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [ ] YES 💚 - [ ] NO ❌
1 parent 460cdca commit b488213

File tree

60 files changed

+383
-495
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+383
-495
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ internal constructor(
481481
) {
482482
val currentThread = Thread.currentThread()
483483

484-
val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread).toList()
484+
val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread)
485485
if (runningThreads.isNotEmpty()) {
486486
LOGGER.warn(
487487
"""

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,7 @@ open class StandardNameTransformer : NamingConventionTransformer {
7373
return Jsons.jsonNode<Map<String, JsonNode>>(properties)
7474
} else if (root.isArray) {
7575
return Jsons.jsonNode(
76-
MoreIterators.toList(root.elements())
77-
.map { r: JsonNode -> formatJsonPath(r) }
78-
.toList()
76+
MoreIterators.toList(root.elements()).map { r: JsonNode -> formatJsonPath(r) }
7977
)
8078
} else {
8179
return root

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt

+12-14
Original file line numberDiff line numberDiff line change
@@ -241,20 +241,18 @@ internal constructor(
241241
streamDescriptor,
242242
)
243243
}
244-
return streams
245-
.sortedWith(
246-
Comparator.comparing(
247-
{ s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() },
248-
Comparator.reverseOrder(),
249-
) // if no time is present, it suggests the queue has no records. set MAX time
250-
// as a sentinel value to
251-
// represent no records.
252-
.thenComparing { s: StreamDescriptor ->
253-
sdToTimeOfLastRecord[s]!!.orElse(Instant.MAX)
254-
}
255-
.thenComparing { s: StreamDescriptor -> s.namespace + s.name },
256-
)
257-
.toList()
244+
return streams.sortedWith(
245+
Comparator.comparing(
246+
{ s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() },
247+
Comparator.reverseOrder(),
248+
) // if no time is present, it suggests the queue has no records. set MAX time
249+
// as a sentinel value to
250+
// represent no records.
251+
.thenComparing { s: StreamDescriptor ->
252+
sdToTimeOfLastRecord[s]!!.orElse(Instant.MAX)
253+
}
254+
.thenComparing { s: StreamDescriptor -> s.namespace + s.name },
255+
)
258256
}
259257

260258
companion object {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
281281
// into the non-STREAM world for correctness.
282282
synchronized(lock) {
283283
aliasIds.addAll(
284-
descToStateIdQ.values
285-
.flatMap { obj: LinkedBlockingDeque<Long> -> obj }
286-
.toList(),
284+
descToStateIdQ.values.flatMap { obj: LinkedBlockingDeque<Long> -> obj },
287285
)
288286
descToStateIdQ.clear()
289287
retroactiveGlobalStateId = StateIdProvider.nextId

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt

+1-5
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,7 @@ object SentryExceptionHelper {
122122
errorMessageAndType[ErrorMapKeys.ERROR_MAP_MESSAGE_KEY] =
123123
String.format(
124124
"%s",
125-
stacktraceLines[
126-
Arrays.stream(stacktraceLines)
127-
.toList()
128-
.indexOf(followingLine) + 1
129-
]
125+
stacktraceLines[stacktraceLines.indexOf(followingLine) + 1]
130126
.trim { it <= ' ' }
131127
)
132128
errorMessageAndType[ErrorMapKeys.ERROR_MAP_TYPE_KEY] =

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class InMemoryRecordBufferingStrategy(
7171
stream.name,
7272
streamBuffer[stream]!!.size
7373
)
74-
recordWriter.accept(stream, streamBuffer[stream]!!.toList())
74+
recordWriter.accept(stream, streamBuffer[stream]!!)
7575
LOGGER.info("Flushing completed for {}", stream.name)
7676
}
7777

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ object ConnectorExceptionUtil {
123123
initialMessage: String,
124124
eithers: List<Either<out T, Result>>
125125
): List<Result> {
126-
val throwables: List<T> = eithers.filter { it.isLeft() }.map { it.left!! }.toList()
126+
val throwables: List<T> = eithers.filter { it.isLeft() }.map { it.left!! }
127127
if (throwables.isNotEmpty()) {
128128
logAllAndThrowFirst(initialMessage, throwables)
129129
}
130130
// No need to filter on isRight since isLeft will throw before reaching this line.
131-
return eithers.map { obj: Either<out T, Result> -> obj.right!! }.toList()
131+
return eithers.map { obj: Either<out T, Result> -> obj.right!! }
132132
}
133133

134134
private fun isTransientErrorException(e: Throwable?): Boolean {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt

-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ class ConcurrentStreamConsumer(
8383
.map { runnable: ConcurrentStreamRunnable ->
8484
CompletableFuture.runAsync(runnable, executorService)
8585
}
86-
.toList()
8786

8887
/*
8988
* Wait for the submitted streams to complete before returning. This uses the join() method to allow

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,10 @@ internal class TestJdbcUtils {
119119
val rs = connection.createStatement().executeQuery("SELECT * FROM id_and_name;")
120120
val actual =
121121
JdbcDatabase.toUnsafeStream(rs) { queryContext: ResultSet ->
122-
sourceOperations.rowToJson(queryContext)
123-
}
124-
.toList()
125-
Assertions.assertEquals(RECORDS_AS_JSON, actual)
122+
sourceOperations.rowToJson(queryContext)
123+
}
124+
125+
Assertions.assertEquals(RECORDS_AS_JSON, actual.toList())
126126
}
127127
}
128128

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ internal class TestStreamingJdbcDatabase {
144144
// This check assumes that FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE = 200 MB.
145145
// Update this check if the buffer size constant is changed.
146146
Assertions.assertEquals(2, fetchSizes.size)
147-
val sortedSizes = fetchSizes.sorted().toList()
147+
val sortedSizes = fetchSizes.sorted()
148148
Assertions.assertTrue(sortedSizes[0] < FetchSizeConstants.INITIAL_SAMPLE_SIZE)
149149
Assertions.assertEquals(FetchSizeConstants.INITIAL_SAMPLE_SIZE, sortedSizes[1])
150150
}

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+1-6
Original file line numberDiff line numberDiff line change
@@ -548,12 +548,7 @@ class AsyncStreamConsumerTest {
548548
)
549549

550550
// captures the output of all the workers, since our records could come out in any of them.
551-
val actualRecords =
552-
argumentCaptor.allValues
553-
.stream() // flatten those results into a single list for the simplicity of
554-
// comparison
555-
.flatMap { s: Stream<*> -> s }
556-
.toList()
551+
val actualRecords = argumentCaptor.allValues.flatMap { it.toList() }
557552

558553
val expRecords =
559554
allRecords.map { m: AirbyteMessage ->

0 commit comments

Comments
 (0)