Skip to content

Commit 460cdca

Browse files
remove calls to java stream() (#37539)
## What <!-- * Describe what the change is solving. Link all GitHub issues related to this change. --> ## How <!-- * Describe how code changes achieve the solution. --> ## Review guide <!-- 1. `x.py` 2. `y.py` --> ## User Impact <!-- * What is the end result perceived by the user? * If there are negative side effects, please list them. --> ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [ ] YES 💚 - [ ] NO ❌
1 parent 0805f9e commit 460cdca

File tree

90 files changed

+530
-1250
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+530
-1250
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ internal constructor(
301301
* stream consumer.
302302
*/
303303
val partitionSize = streamConsumer.parallelism
304-
val partitions = Lists.partition(streams.stream().toList(), partitionSize)
304+
val partitions = Lists.partition(streams.toList(), partitionSize)
305305

306306
// Submit each stream partition for concurrent execution
307307
partitions.forEach(
@@ -520,7 +520,7 @@ internal constructor(
520520
scheduledExecutorService.schedule(
521521
{
522522
if (
523-
ThreadUtils.getAllThreads().stream().anyMatch { runningThread: Thread ->
523+
ThreadUtils.getAllThreads().any { runningThread: Thread ->
524524
!runningThread.isDaemon && runningThread.name != currentThread.name
525525
}
526526
) {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt

-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ open class StandardNameTransformer : NamingConventionTransformer {
7474
} else if (root.isArray) {
7575
return Jsons.jsonNode(
7676
MoreIterators.toList(root.elements())
77-
.stream()
7877
.map { r: JsonNode -> formatJsonPath(r) }
7978
.toList()
8079
)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+4-12
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService
2828
import java.util.concurrent.Executors
2929
import java.util.concurrent.atomic.AtomicLong
3030
import java.util.function.Consumer
31-
import java.util.stream.Collectors
3231
import kotlin.jvm.optionals.getOrNull
3332
import org.jetbrains.annotations.VisibleForTesting
3433

@@ -160,18 +159,11 @@ constructor(
160159
bufferManager.close()
161160

162161
val streamSyncSummaries =
163-
streamNames
164-
.stream()
165-
.collect(
166-
Collectors.toMap(
167-
{ streamDescriptor: StreamDescriptor -> streamDescriptor },
168-
{ streamDescriptor: StreamDescriptor ->
169-
StreamSyncSummary(
170-
Optional.of(getRecordCounter(streamDescriptor).get()),
171-
)
172-
},
173-
),
162+
streamNames.associateWith { streamDescriptor: StreamDescriptor ->
163+
StreamSyncSummary(
164+
Optional.of(getRecordCounter(streamDescriptor).get()),
174165
)
166+
}
175167
onClose.accept(hasFailed, streamSyncSummaries)
176168

177169
// as this throws an exception, we need to be after all other close functions.

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt

+11-30
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import java.util.Optional
1515
import java.util.concurrent.ConcurrentHashMap
1616
import java.util.concurrent.ConcurrentMap
1717
import java.util.concurrent.atomic.AtomicBoolean
18-
import java.util.stream.Collectors
1918
import kotlin.math.min
2019

2120
private val logger = KotlinLogging.logger {}
@@ -194,12 +193,10 @@ internal constructor(
194193
)
195194
val workersWithBatchesSize =
196195
runningWorkerBatchesSizes
197-
.stream()
198196
.filter { obj: Optional<Long> -> obj.isPresent }
199-
.mapToLong { obj: Optional<Long> -> obj.get() }
200-
.sum()
197+
.sumOf { obj: Optional<Long> -> obj.get() }
201198
val workersWithoutBatchesCount =
202-
runningWorkerBatchesSizes.stream().filter { obj: Optional<Long> -> obj.isEmpty }.count()
199+
runningWorkerBatchesSizes.count { obj: Optional<Long> -> obj.isEmpty }
203200
val workersWithoutBatchesSizeEstimate =
204201
(min(
205202
flusher.optimalBatchSizeBytes.toDouble(),
@@ -232,36 +229,20 @@ internal constructor(
232229
fun orderStreamsByPriority(streams: Set<StreamDescriptor>): List<StreamDescriptor> {
233230
// eagerly pull attributes so that values are consistent throughout comparison
234231
val sdToQueueSize =
235-
streams
236-
.stream()
237-
.collect(
238-
Collectors.toMap(
239-
{ s: StreamDescriptor -> s },
240-
{ streamDescriptor: StreamDescriptor ->
241-
bufferDequeue.getQueueSizeBytes(
242-
streamDescriptor,
243-
)
244-
},
245-
),
232+
streams.associateWith { streamDescriptor: StreamDescriptor ->
233+
bufferDequeue.getQueueSizeBytes(
234+
streamDescriptor,
246235
)
236+
}
247237

248238
val sdToTimeOfLastRecord =
249-
streams
250-
.stream()
251-
.collect(
252-
Collectors.toMap(
253-
{ s: StreamDescriptor -> s },
254-
{ streamDescriptor: StreamDescriptor ->
255-
bufferDequeue.getTimeOfLastRecord(
256-
streamDescriptor,
257-
)
258-
},
259-
),
239+
streams.associateWith { streamDescriptor: StreamDescriptor ->
240+
bufferDequeue.getTimeOfLastRecord(
241+
streamDescriptor,
260242
)
261-
243+
}
262244
return streams
263-
.stream()
264-
.sorted(
245+
.sortedWith(
265246
Comparator.comparing(
266247
{ s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() },
267248
Comparator.reverseOrder(),

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/FlushWorkers.kt

+8-31
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import java.util.concurrent.ThreadPoolExecutor
2020
import java.util.concurrent.TimeUnit
2121
import java.util.concurrent.atomic.AtomicBoolean
2222
import java.util.function.Consumer
23-
import java.util.stream.Collectors
2423

2524
private val logger = KotlinLogging.logger {}
2625

@@ -153,14 +152,10 @@ constructor(
153152
)
154153
val stateIdToCount =
155154
batch.data
156-
.stream()
157155
.map(StreamAwareQueue.MessageWithMeta::stateId)
158-
.collect(
159-
Collectors.groupingBy(
160-
{ stateId: Long -> stateId },
161-
Collectors.counting(),
162-
),
163-
)
156+
.groupingBy { it }
157+
.eachCount()
158+
.mapValues { it.value.toLong() }
164159
logger.info {
165160
"Flush Worker (${humanReadableFlushWorkerId(
166161
flushWorkerId,
@@ -169,14 +164,7 @@ constructor(
169164
)} bytes."
170165
}
171166

172-
flusher.flush(
173-
desc,
174-
batch.data
175-
.stream()
176-
.map(
177-
StreamAwareQueue.MessageWithMeta::message,
178-
),
179-
)
167+
flusher.flush(desc, batch.data.map { it.message }.stream())
180168
batch.flushStates(stateIdToCount, outputRecordCollector)
181169
}
182170
logger.info {
@@ -207,22 +195,12 @@ constructor(
207195
// wait for all buffers to be flushed.
208196
while (true) {
209197
val streamDescriptorToRemainingRecords =
210-
bufferDequeue.bufferedStreams
211-
.stream()
212-
.collect(
213-
Collectors.toMap(
214-
{ desc: StreamDescriptor -> desc },
215-
{ desc: StreamDescriptor ->
216-
bufferDequeue.getQueueSizeInRecords(desc).orElseThrow()
217-
},
218-
),
219-
)
220-
221-
val anyRecordsLeft =
222-
streamDescriptorToRemainingRecords.values.stream().anyMatch { size: Long ->
223-
size > 0
198+
bufferDequeue.bufferedStreams.associateWith { desc: StreamDescriptor ->
199+
bufferDequeue.getQueueSizeInRecords(desc).orElseThrow()
224200
}
225201

202+
val anyRecordsLeft = streamDescriptorToRemainingRecords.values.any { it > 0 }
203+
226204
if (!anyRecordsLeft) {
227205
break
228206
}
@@ -234,7 +212,6 @@ constructor(
234212
)
235213
.append(System.lineSeparator())
236214
streamDescriptorToRemainingRecords.entries
237-
.stream()
238215
.filter { entry: Map.Entry<StreamDescriptor, Long> -> entry.value > 0 }
239216
.forEach { entry: Map.Entry<StreamDescriptor, Long> ->
240217
workerInfo.append(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt

+1-6
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,7 @@ class BufferDequeue(
109109
get() = memoryManager.maxMemoryBytes
110110

111111
val totalGlobalQueueSizeBytes: Long
112-
get() =
113-
buffers.values
114-
.stream()
115-
.map { obj: StreamAwareQueue -> obj.currentMemoryUsage }
116-
.mapToLong { obj: Long -> obj }
117-
.sum()
112+
get() = buffers.values.sumOf { obj: StreamAwareQueue -> obj.currentMemoryUsage }
118113

119114
fun getQueueSizeInRecords(streamDescriptor: StreamDescriptor): Optional<Long> {
120115
return getBuffer(streamDescriptor).map { buf: StreamAwareQueue -> buf.size().toLong() }

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/MemoryAwareMessageBatch.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ class MemoryAwareMessageBatch(
3636
* can be flushed back to platform via stateManager.
3737
*/
3838
fun flushStates(
39-
stateIdToCount: Map<Long?, Long?>,
39+
stateIdToCount: Map<Long, Long>,
4040
outputRecordCollector: Consumer<AirbyteMessage>,
4141
) {
42-
stateIdToCount.forEach { (stateId: Long?, count: Long?) ->
42+
stateIdToCount.forEach { (stateId: Long, count: Long) ->
4343
stateManager.decrement(
44-
stateId!!,
45-
count!!,
44+
stateId,
45+
count,
4646
)
4747
}
4848
stateManager.flushStates(outputRecordCollector)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt

+3-11
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
282282
synchronized(lock) {
283283
aliasIds.addAll(
284284
descToStateIdQ.values
285-
.stream()
286-
.flatMap { obj: LinkedBlockingDeque<Long> -> obj.stream() }
285+
.flatMap { obj: LinkedBlockingDeque<Long> -> obj }
287286
.toList(),
288287
)
289288
descToStateIdQ.clear()
@@ -292,19 +291,12 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
292291
descToStateIdQ[SENTINEL_GLOBAL_DESC] = LinkedBlockingDeque()
293292
descToStateIdQ[SENTINEL_GLOBAL_DESC]!!.add(retroactiveGlobalStateId)
294293

295-
val combinedCounter: Long =
296-
stateIdToCounter.values
297-
.stream()
298-
.mapToLong { obj: AtomicLong -> obj.get() }
299-
.sum()
294+
val combinedCounter: Long = stateIdToCounter.values.sumOf { it.get() }
300295
stateIdToCounter.clear()
301296
stateIdToCounter[retroactiveGlobalStateId] = AtomicLong(combinedCounter)
302297

303298
val statsCounter: Long =
304-
stateIdToCounterForPopulatingDestinationStats.values
305-
.stream()
306-
.mapToLong { obj: AtomicLong -> obj.get() }
307-
.sum()
299+
stateIdToCounterForPopulatingDestinationStats.values.sumOf { it.get() }
308300
stateIdToCounterForPopulatingDestinationStats.clear()
309301
stateIdToCounterForPopulatingDestinationStats.put(
310302
retroactiveGlobalStateId,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/dest_state_lifecycle_manager/DestStreamStateLifecycleManager.kt

+18-24
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage
1010
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
1111
import io.airbyte.protocol.models.v0.StreamDescriptor
1212
import java.util.*
13-
import java.util.function.Supplier
14-
import java.util.stream.Collectors
1513

1614
/**
1715
* This [DestStateLifecycleManager] handles any state where the state messages are scoped by stream.
@@ -125,29 +123,25 @@ class DestStreamStateLifecycleManager(private val defaultNamespace: String?) :
125123
private fun listStatesInOrder(
126124
streamToState: Map<StreamDescriptor, AirbyteMessage>
127125
): Queue<AirbyteMessage> {
128-
return streamToState.entries
129-
.stream() // typically, we support by namespace and then stream name, so we retain
130-
// that pattern here.
131-
.sorted(
132-
Comparator.comparing<Map.Entry<StreamDescriptor, AirbyteMessage>, String>(
133-
{ entry: Map.Entry<StreamDescriptor, AirbyteMessage> ->
134-
entry.key.namespace
135-
},
136-
Comparator.nullsFirst<String>(Comparator.naturalOrder<String>())
137-
) // namespace is allowed to be null
138-
.thenComparing<String> { entry: Map.Entry<StreamDescriptor, AirbyteMessage>
139-
->
140-
entry.key.name
141-
}
142-
)
143-
.map<AirbyteMessage> { obj: Map.Entry<StreamDescriptor, AirbyteMessage> ->
144-
obj.value
145-
}
146-
.collect(
147-
Collectors.toCollection<AirbyteMessage, LinkedList<AirbyteMessage>>(
148-
Supplier<LinkedList<AirbyteMessage>> { LinkedList() }
126+
return LinkedList(
127+
streamToState.entries
128+
129+
// typically, we support by namespace and then stream name, so we retain
130+
// that pattern here.
131+
.sortedWith(
132+
Comparator.comparing<Map.Entry<StreamDescriptor, AirbyteMessage>, String>(
133+
{ entry: Map.Entry<StreamDescriptor, AirbyteMessage> ->
134+
entry.key.namespace
135+
},
136+
Comparator.nullsFirst<String>(Comparator.naturalOrder<String>())
137+
) // namespace is allowed to be null
138+
.thenComparing<String> {
139+
entry: Map.Entry<StreamDescriptor, AirbyteMessage> ->
140+
entry.key.name
141+
}
149142
)
150-
)
143+
.map { obj: Map.Entry<StreamDescriptor, AirbyteMessage> -> obj.value }
144+
)
151145
}
152146

153147
/**

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt

+4-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import io.airbyte.commons.functional.Either
1212
import java.io.EOFException
1313
import java.sql.SQLException
1414
import java.sql.SQLSyntaxErrorException
15-
import java.util.stream.Collectors
1615
import org.apache.commons.lang3.exception.ExceptionUtils
1716
import org.slf4j.Logger
1817
import org.slf4j.LoggerFactory
@@ -111,10 +110,9 @@ object ConnectorExceptionUtil {
111110
fun <T : Throwable> logAllAndThrowFirst(initialMessage: String, throwables: Collection<T>) {
112111
if (!throwables.isEmpty()) {
113112
val stacktraces =
114-
throwables
115-
.stream()
116-
.map { throwable: Throwable -> ExceptionUtils.getStackTrace(throwable) }
117-
.collect(Collectors.joining("\n"))
113+
throwables.joinToString("\n") { throwable: Throwable ->
114+
ExceptionUtils.getStackTrace(throwable)
115+
}
118116
LOGGER.error("$initialMessage$stacktraces\nRethrowing first exception.")
119117
throw throwables.iterator().next()
120118
}
@@ -130,7 +128,7 @@ object ConnectorExceptionUtil {
130128
logAllAndThrowFirst(initialMessage, throwables)
131129
}
132130
// No need to filter on isRight since isLeft will throw before reaching this line.
133-
return eithers.stream().map { obj: Either<out T, Result> -> obj.right!! }.toList()
131+
return eithers.map { obj: Either<out T, Result> -> obj.right!! }.toList()
134132
}
135133

136134
private fun isTransientErrorException(e: Throwable?): Boolean {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt

-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ class ConcurrentStreamConsumer(
7777
*/
7878
val futures: Collection<CompletableFuture<Void>> =
7979
streams
80-
.stream()
8180
.map { stream: AutoCloseableIterator<AirbyteMessage> ->
8281
ConcurrentStreamRunnable(stream, this)
8382
}

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ internal class TestStreamingJdbcDatabase {
144144
// This check assumes that FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE = 200 MB.
145145
// Update this check if the buffer size constant is changed.
146146
Assertions.assertEquals(2, fetchSizes.size)
147-
val sortedSizes = fetchSizes.stream().sorted().toList()
147+
val sortedSizes = fetchSizes.sorted().toList()
148148
Assertions.assertTrue(sortedSizes[0] < FetchSizeConstants.INITIAL_SAMPLE_SIZE)
149149
Assertions.assertEquals(FetchSizeConstants.INITIAL_SAMPLE_SIZE, sortedSizes[1])
150150
}

0 commit comments

Comments
 (0)