Skip to content

Commit 6a0211a

Browse files
replace all java collectors.toSet with kotlin construct
1 parent b0ab148 commit 6a0211a

File tree

13 files changed

+40
-86
lines changed

13 files changed

+40
-86
lines changed

airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import io.airbyte.commons.util.MoreIterators
1616
import java.util.*
1717
import java.util.Spliterators.AbstractSpliterator
1818
import java.util.function.Consumer
19-
import java.util.stream.Collectors
2019
import java.util.stream.Stream
2120
import java.util.stream.StreamSupport
2221
import org.bson.BsonDocument
@@ -57,9 +56,8 @@ class MongoDatabase(connectionString: String, databaseName: String) :
5756
get() {
5857
val collectionNames = database.listCollectionNames() ?: return Collections.emptySet()
5958
return MoreIterators.toSet(collectionNames.iterator())
60-
.stream()
6159
.filter { c: String -> !c.startsWith(MONGO_RESERVED_COLLECTION_PREFIX) }
62-
.collect(Collectors.toSet())
60+
.toSet()
6361
}
6462

6563
fun getCollection(collectionName: String): MongoCollection<Document> {

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,7 @@ abstract class AbstractJdbcSource<Datatype>(
423423
): Map<String, MutableList<String>> {
424424
LOGGER.info(
425425
"Discover primary keys for tables: " +
426-
tableInfos
427-
.stream()
428-
.map { obj: TableInfo<CommonField<Datatype>> -> obj.name }
429-
.collect(Collectors.toSet())
426+
tableInfos.map { obj: TableInfo<CommonField<Datatype>> -> obj.name }.toSet()
430427
)
431428
try {
432429
// Get all primary keys without specifying a table name

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,8 @@ object DbSourceDiscoverUtil {
143143
)
144144
.withSourceDefinedPrimaryKey(primaryKeys)
145145
}
146-
// This is ugly. Some of our tests change the streams on the AirbyteCatalog
147-
// object...
148-
.toMutableList()
146+
.toMutableList() // This is ugly, but we modify this list in
147+
// JdbcSourceAcceptanceTest.testDiscoverWithMultipleSchemas
149148
return AirbyteCatalog().withStreams(streams)
150149
}
151150

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
99
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
1010
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1111
import io.airbyte.protocol.models.v0.SyncMode
12-
import java.util.stream.Collectors
1312

1413
object RelationalDbReadUtil {
1514
fun identifyStreamsToSnapshot(
@@ -38,11 +37,10 @@ object RelationalDbReadUtil {
3837
): List<ConfiguredAirbyteStream> {
3938
val initialLoadStreamsNamespacePairs =
4039
streamsForInitialLoad
41-
.stream()
4240
.map { stream: ConfiguredAirbyteStream ->
4341
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.stream)
4442
}
45-
.collect(Collectors.toSet())
43+
.toSet()
4644
return catalog.streams
4745
.stream()
4846
.filter { c: ConfiguredAirbyteStream -> c.syncMode == SyncMode.INCREMENTAL }

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ class CursorManager<S : Any>(
9292
): Map<AirbyteStreamNameNamespacePair, CursorInfo> {
9393
val allStreamNames =
9494
catalog.streams
95-
.stream()
9695
.filter { c: ConfiguredAirbyteStream ->
9796
if (onlyIncludeIncrementalStreams) {
9897
return@filter c.syncMode == SyncMode.INCREMENTAL
@@ -103,7 +102,7 @@ class CursorManager<S : Any>(
103102
.map { stream: AirbyteStream ->
104103
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream)
105104
}
106-
.collect(Collectors.toSet())
105+
.toMutableSet()
107106
allStreamNames.addAll(
108107
streamSupplier
109108
.get()

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,14 @@ class GlobalStateManager(
105105
): Set<AirbyteStreamNameNamespacePair> {
106106
if (airbyteStateMessage!!.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) {
107107
return airbyteStateMessage.global.streamStates
108-
.stream()
109108
.map { streamState: AirbyteStreamState ->
110109
val cloned = Jsons.clone(streamState)
111110
AirbyteStreamNameNamespacePair(
112111
cloned.streamDescriptor.name,
113112
cloned.streamDescriptor.namespace
114113
)
115114
}
116-
.collect(Collectors.toSet())
115+
.toSet()
117116
} else {
118117
val legacyState: DbState? =
119118
Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
@@ -127,12 +126,11 @@ class GlobalStateManager(
127126
streams: List<DbStreamState>
128127
): Set<AirbyteStreamNameNamespacePair> {
129128
return streams
130-
.stream()
131129
.map { stream: DbStreamState ->
132130
val cloned = Jsons.clone(stream)
133131
AirbyteStreamNameNamespacePair(cloned.streamName, cloned.streamNamespace)
134132
}
135-
.collect(Collectors.toSet())
133+
.toSet()
136134
}
137135

138136
companion object {

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

+19-39
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import io.airbyte.protocol.models.JsonSchemaType
1616
import io.airbyte.protocol.models.v0.*
1717
import java.util.*
1818
import java.util.function.Consumer
19-
import java.util.stream.Collectors
20-
import java.util.stream.Stream
2119
import org.junit.jupiter.api.AfterEach
2220
import org.junit.jupiter.api.Assertions
2321
import org.junit.jupiter.api.BeforeEach
@@ -123,9 +121,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
123121
private fun assertStateDoNotHaveDuplicateStreams(stateMessage: AirbyteStateMessage) {
124122
val dedupedStreamStates =
125123
stateMessage.global.streamStates
126-
.stream()
127124
.map { streamState: AirbyteStreamState -> streamState.streamDescriptor }
128-
.collect(Collectors.toSet())
125+
.toSet()
129126
Assertions.assertEquals(dedupedStreamStates.size, stateMessage.global.streamStates.size)
130127
}
131128

@@ -290,7 +287,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
290287
if (message.type == AirbyteMessage.Type.RECORD) {
291288
val recordMessage = message.record
292289
recordsPerStream
293-
.computeIfAbsent(recordMessage.stream) { c: String -> ArrayList() }
290+
.computeIfAbsent(recordMessage.stream) { _: String -> ArrayList() }
294291
.add(recordMessage)
295292
}
296293
}
@@ -326,10 +323,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
326323
assertExpectedRecords(
327324
expectedRecords,
328325
actualRecords,
329-
actualRecords
330-
.stream()
331-
.map { obj: AirbyteRecordMessage -> obj.stream }
332-
.collect(Collectors.toSet()),
326+
actualRecords.map { obj: AirbyteRecordMessage -> obj.stream }.toSet(),
333327
)
334328
}
335329

@@ -356,8 +350,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
356350
) {
357351
val actualData =
358352
actualRecords
359-
.stream()
360-
.map<JsonNode> { recordMessage: AirbyteRecordMessage ->
353+
.map { recordMessage: AirbyteRecordMessage ->
361354
Assertions.assertTrue(streamNames.contains(recordMessage.stream))
362355
Assertions.assertNotNull(recordMessage.emittedAt)
363356

@@ -374,7 +367,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
374367
removeCDCColumns(data as ObjectNode)
375368
data
376369
}
377-
.collect(Collectors.toSet())
370+
.toSet()
378371

379372
Assertions.assertEquals(expectedRecords, actualData)
380373
}
@@ -639,8 +632,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
639632
// Non resumeable full refresh does not get any state messages.
640633
assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong())
641634
assertExpectedRecords(
642-
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
643-
.collect(Collectors.toSet()),
635+
(MODEL_RECORDS_2 + MODEL_RECORDS).toSet(),
644636
recordMessages1,
645637
setOf(MODELS_STREAM_NAME),
646638
names,
@@ -657,17 +649,15 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
657649
assertExpectedStateMessagesFromIncrementalSync(stateMessages2)
658650
assertExpectedStateMessageCountMatches(stateMessages2, 1)
659651
assertExpectedRecords(
660-
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
661-
.collect(Collectors.toSet()),
652+
(MODEL_RECORDS_2 + puntoRecord).toSet(),
662653
recordMessages2,
663654
setOf(MODELS_STREAM_NAME),
664655
names,
665656
modelsSchema(),
666657
)
667658
} else {
668659
assertExpectedRecords(
669-
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
670-
.collect(Collectors.toSet()),
660+
(MODEL_RECORDS_2 + MODEL_RECORDS).toSet(),
671661
recordMessages1,
672662
setOf(MODELS_STREAM_NAME),
673663
names,
@@ -687,8 +677,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
687677
val stateMessages2 = extractStateMessages(actualRecords2)
688678

689679
assertExpectedRecords(
690-
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
691-
.collect(Collectors.toSet()),
680+
(MODEL_RECORDS_2 + puntoRecord).toSet(),
692681
recordMessages2,
693682
setOf(MODELS_STREAM_NAME),
694683
names,
@@ -706,7 +695,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
706695
val actualRecords3 = AutoCloseableIterators.toListAndClose(read3)
707696
val recordMessages3 = extractRecordMessages(actualRecords3)
708697
assertExpectedRecords(
709-
Streams.concat(MODEL_RECORDS_2.stream()).collect(Collectors.toSet()),
698+
MODEL_RECORDS_2.toSet(),
710699
recordMessages3,
711700
setOf(MODELS_STREAM_NAME),
712701
names,
@@ -790,8 +779,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
790779
// Non resumeable full refresh does not get any state messages.
791780
assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong())
792781
assertExpectedRecords(
793-
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
794-
.collect(Collectors.toSet()),
782+
(MODEL_RECORDS_2 + MODEL_RECORDS).toSet(),
795783
recordMessages1,
796784
setOf(MODELS_STREAM_NAME),
797785
names,
@@ -808,8 +796,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
808796
assertExpectedStateMessagesFromIncrementalSync(stateMessages2)
809797
assertExpectedStateMessageCountMatches(stateMessages2, 1)
810798
assertExpectedRecords(
811-
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
812-
.collect(Collectors.toSet()),
799+
(MODEL_RECORDS_2 + puntoRecord).toSet(),
813800
recordMessages2,
814801
setOf(MODELS_STREAM_NAME),
815802
names,
@@ -917,9 +904,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
917904
Assertions.assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.global.sharedState)
918905
val streamsInStateAfterFirstSyncCompletion =
919906
stateMessageEmittedAfterFirstSyncCompletion.global.streamStates
920-
.stream()
921907
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
922-
.collect(Collectors.toSet())
908+
.toSet()
923909
Assertions.assertEquals(1, streamsInStateAfterFirstSyncCompletion.size)
924910
Assertions.assertTrue(
925911
streamsInStateAfterFirstSyncCompletion.contains(
@@ -1009,9 +995,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
1009995
HashSet(MODEL_RECORDS_RANDOM),
1010996
recordsForModelsRandomStreamFromSecondBatch,
1011997
recordsForModelsRandomStreamFromSecondBatch
1012-
.stream()
1013998
.map { obj: AirbyteRecordMessage -> obj.stream }
1014-
.collect(Collectors.toSet()),
999+
.toSet(),
10151000
Sets.newHashSet(RANDOM_TABLE_NAME),
10161001
randomSchema(),
10171002
)
@@ -1078,9 +1063,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
10781063
)
10791064
val streamsInSyncCompletionStateAfterThirdSync =
10801065
stateMessageEmittedAfterThirdSyncCompletion.global.streamStates
1081-
.stream()
10821066
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
1083-
.collect(Collectors.toSet())
1067+
.toSet()
10841068
Assertions.assertTrue(
10851069
streamsInSyncCompletionStateAfterThirdSync.contains(
10861070
StreamDescriptor().withName(RANDOM_TABLE_NAME).withNamespace(randomSchema()),
@@ -1109,9 +1093,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
11091093
recordsWrittenInRandomTable,
11101094
recordsForModelsRandomStreamFromThirdBatch,
11111095
recordsForModelsRandomStreamFromThirdBatch
1112-
.stream()
11131096
.map { obj: AirbyteRecordMessage -> obj.stream }
1114-
.collect(Collectors.toSet()),
1097+
.toSet(),
11151098
Sets.newHashSet(RANDOM_TABLE_NAME),
11161099
randomSchema(),
11171100
)
@@ -1138,9 +1121,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
11381121
Assertions.assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.global.sharedState)
11391122
val streamsInStateAfterFirstSyncCompletion =
11401123
stateMessageEmittedAfterFirstSyncCompletion.global.streamStates
1141-
.stream()
11421124
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
1143-
.collect(Collectors.toSet())
1125+
.toSet()
11441126
Assertions.assertEquals(1, streamsInStateAfterFirstSyncCompletion.size)
11451127
Assertions.assertTrue(
11461128
streamsInStateAfterFirstSyncCompletion.contains(
@@ -1256,9 +1238,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
12561238
)
12571239
val streamsInSnapshotState =
12581240
stateMessageEmittedAfterSnapshotCompletionInSecondSync.global.streamStates
1259-
.stream()
12601241
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
1261-
.collect(Collectors.toSet())
1242+
.toSet()
12621243
Assertions.assertEquals(2, streamsInSnapshotState.size)
12631244
Assertions.assertTrue(
12641245
streamsInSnapshotState.contains(
@@ -1283,9 +1264,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
12831264
)
12841265
val streamsInSyncCompletionState =
12851266
stateMessageEmittedAfterSecondSyncCompletion.global.streamStates
1286-
.stream()
12871267
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
1288-
.collect(Collectors.toSet())
1268+
.toSet()
12891269
Assertions.assertEquals(2, streamsInSnapshotState.size)
12901270
Assertions.assertTrue(
12911271
streamsInSyncCompletionState.contains(

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt

-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import io.airbyte.protocol.models.Field
2020
import io.airbyte.protocol.models.JsonSchemaType
2121
import io.airbyte.protocol.models.v0.*
2222
import java.math.BigDecimal
23-
import java.nio.ByteBuffer
2423
import java.sql.Connection
2524
import java.util.*
2625
import org.junit.jupiter.api.Assertions
@@ -172,7 +171,6 @@ abstract class JdbcStressTest {
172171
}
173172
.peek { m: AirbyteMessage -> assertExpectedMessage(m) }
174173
.count()
175-
var a: ByteBuffer
176174
val expectedRoundedRecordsCount = TOTAL_RECORDS - TOTAL_RECORDS % 1000
177175
LOGGER.info("expected records count: " + TOTAL_RECORDS)
178176
LOGGER.info("actual records count: $actualCount")

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt

+2-8
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ package io.airbyte.commons.enums
77
import com.google.common.base.Preconditions
88
import com.google.common.collect.Maps
99
import com.google.common.collect.Sets
10-
import java.util.Arrays
1110
import java.util.Locale
1211
import java.util.Optional
1312
import java.util.concurrent.ConcurrentMap
14-
import java.util.stream.Collectors
1513

1614
class Enums {
1715
companion object {
@@ -54,12 +52,8 @@ class Enums {
5452
Preconditions.checkArgument(c2.isEnum)
5553
return (c1.enumConstants.size == c2.enumConstants.size &&
5654
Sets.difference(
57-
Arrays.stream(c1.enumConstants)
58-
.map { obj: T1 -> obj!!.name }
59-
.collect(Collectors.toSet()),
60-
Arrays.stream(c2.enumConstants)
61-
.map { obj: T2 -> obj!!.name }
62-
.collect(Collectors.toSet()),
55+
c1.enumConstants.map { obj: T1 -> obj!!.name }.toSet(),
56+
c2.enumConstants.map { obj: T2 -> obj!!.name }.toSet(),
6357
)
6458
.isEmpty())
6559
}

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import java.io.File
1111
import java.io.IOException
1212
import java.net.URI
1313
import java.net.URISyntaxException
14-
import java.util.stream.Collectors
1514
import me.andrz.jackson.JsonContext
1615
import me.andrz.jackson.JsonReferenceException
1716
import me.andrz.jackson.JsonReferenceProcessor
@@ -89,9 +88,8 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR
8988

9089
fun validate(schemaJson: JsonNode, objectJson: JsonNode): Set<String> {
9190
return validateInternal(schemaJson, objectJson)
92-
.stream()
9391
.map { obj: ValidationMessage -> obj.message }
94-
.collect(Collectors.toSet())
92+
.toSet()
9593
}
9694

9795
fun getValidationMessageArgs(schemaJson: JsonNode, objectJson: JsonNode): List<Array<String>> {

airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import io.airbyte.commons.text.Names
1010
import io.airbyte.protocol.models.SyncMode
1111
import io.airbyte.validation.json.JsonValidationException
1212
import java.util.*
13-
import java.util.stream.Collectors
1413

1514
/**
1615
* Utilities to convert Catalog protocol to Catalog API client. This class was similar to existing
@@ -76,9 +75,8 @@ object CatalogClientConverters {
7675
// field path.
7776
val selectedFieldNames =
7877
config.selectedFields!!
79-
.stream()
8078
.map { field: SelectedFieldInfo -> field.fieldPath!![0] }
81-
.collect(Collectors.toSet())
79+
.toSet()
8280
// TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields
8381
// because we
8482
// don't support filtering nested fields yet.

0 commit comments

Comments
 (0)