@@ -13,22 +13,23 @@ import io.airbyte.cdk.db.AbstractDatabase
13
13
import io.airbyte.commons.exceptions.ConnectionErrorException
14
14
import io.airbyte.commons.functional.CheckedFunction
15
15
import io.airbyte.commons.util.MoreIterators
16
- import org.bson.BsonDocument
17
- import org.bson.Document
18
- import org.bson.conversions.Bson
19
- import org.slf4j.Logger
20
- import org.slf4j.LoggerFactory
21
16
import java.util.*
22
17
import java.util.Spliterators.AbstractSpliterator
23
18
import java.util.function.Consumer
24
19
import java.util.stream.Collectors
25
20
import java.util.stream.Stream
26
21
import java.util.stream.StreamSupport
22
+ import org.bson.BsonDocument
23
+ import org.bson.Document
24
+ import org.bson.conversions.Bson
25
+ import org.slf4j.Logger
26
+ import org.slf4j.LoggerFactory
27
27
28
- class MongoDatabase (connectionString : String? , databaseName : String? ) : AbstractDatabase(), AutoCloseable {
28
+ class MongoDatabase (connectionString : String? , databaseName : String? ) :
29
+ AbstractDatabase (), AutoCloseable {
29
30
private var connectionString: ConnectionString ? = null
30
31
var database: com.mongodb.client.MongoDatabase ? = null
31
- private var mongoClient: MongoClient ? = null
32
+ private val mongoClient: MongoClient
32
33
33
34
init {
34
35
try {
@@ -54,14 +55,15 @@ class MongoDatabase(connectionString: String?, databaseName: String?) : Abstract
54
55
55
56
val collectionNames: Set <String ?>
56
57
get() {
57
- val collectionNames = database!! .listCollectionNames() ? : return Collections .EMPTY_SET
58
- return MoreIterators .toSet(database!! .listCollectionNames().iterator()).stream()
59
- .filter { c: String? -> ! c!! .startsWith(MONGO_RESERVED_COLLECTION_PREFIX ) }.collect(Collectors .toSet())
58
+ val collectionNames = database!! .listCollectionNames() ? : return Collections .emptySet()
59
+ return MoreIterators .toSet(database!! .listCollectionNames().iterator())
60
+ .stream()
61
+ .filter { c: String -> ! c.startsWith(MONGO_RESERVED_COLLECTION_PREFIX ) }
62
+ .collect(Collectors .toSet())
60
63
}
61
64
62
- fun getCollection (collectionName : String? ): MongoCollection <Document > {
63
- return database!! .getCollection(collectionName)
64
- .withReadConcern(ReadConcern .MAJORITY )
65
+ fun getCollection (collectionName : String ): MongoCollection <Document > {
66
+ return database!! .getCollection(collectionName).withReadConcern(ReadConcern .MAJORITY )
65
67
}
66
68
67
69
fun getOrCreateNewCollection (collectionName : String ): MongoCollection <Document > {
@@ -73,7 +75,7 @@ class MongoDatabase(connectionString: String?, databaseName: String?) : Abstract
73
75
}
74
76
75
77
@VisibleForTesting
76
- fun createCollection (name : String? ): MongoCollection <Document > {
78
+ fun createCollection (name : String ): MongoCollection <Document > {
77
79
database!! .createCollection(name)
78
80
return database!! .getCollection(name)
79
81
}
@@ -82,40 +84,57 @@ class MongoDatabase(connectionString: String?, databaseName: String?) : Abstract
82
84
val name: String
83
85
get() = database!! .name
84
86
85
- fun read (collectionName : String? , columnNames : List <String >, filter : Optional <Bson ?>): Stream <JsonNode > {
87
+ fun read (
88
+ collectionName : String? ,
89
+ columnNames : List <String >,
90
+ filter : Optional <Bson ?>
91
+ ): Stream <JsonNode > {
86
92
try {
87
93
val collection = database!! .getCollection(collectionName)
88
- val cursor = collection
89
- .find(filter.orElse(BsonDocument ()))
90
- .batchSize(BATCH_SIZE )
91
- .cursor()
94
+ val cursor =
95
+ collection.find(filter.orElse(BsonDocument ())).batchSize(BATCH_SIZE ).cursor()
92
96
93
- return getStream(cursor, CheckedFunction { document: Document -> MongoUtils .toJsonNode(document, columnNames) })
94
- .onClose {
95
- try {
96
- cursor.close()
97
- } catch (e: Exception ) {
98
- throw RuntimeException (e.message, e)
99
- }
97
+ return getStream(
98
+ cursor,
99
+ CheckedFunction { document: Document ->
100
+ MongoUtils .toJsonNode(document, columnNames)
100
101
}
102
+ )
103
+ .onClose {
104
+ try {
105
+ cursor.close()
106
+ } catch (e: Exception ) {
107
+ throw RuntimeException (e.message, e)
108
+ }
109
+ }
101
110
} catch (e: Exception ) {
102
- LOGGER .error(" Exception attempting to read data from collection: {}, {}" , collectionName, e.message)
111
+ LOGGER .error(
112
+ " Exception attempting to read data from collection: {}, {}" ,
113
+ collectionName,
114
+ e.message
115
+ )
103
116
throw RuntimeException (e)
104
117
}
105
118
}
106
119
107
- private fun getStream (cursor : MongoCursor <Document >, mapper : CheckedFunction <Document , JsonNode , Exception >): Stream <JsonNode > {
108
- return StreamSupport .stream(object : AbstractSpliterator <JsonNode ?>(Long .MAX_VALUE , ORDERED ) {
109
- override fun tryAdvance (action : Consumer <in JsonNode >): Boolean {
110
- try {
111
- val document = cursor.tryNext() ? : return false
112
- action.accept(mapper.apply (document))
113
- return true
114
- } catch (e: Exception ) {
115
- throw RuntimeException (e)
120
+ private fun getStream (
121
+ cursor : MongoCursor <Document >,
122
+ mapper : CheckedFunction <Document , JsonNode , Exception >
123
+ ): Stream <JsonNode > {
124
+ return StreamSupport .stream(
125
+ object : AbstractSpliterator <JsonNode >(Long .MAX_VALUE , ORDERED ) {
126
+ override fun tryAdvance (action : Consumer <in JsonNode >): Boolean {
127
+ try {
128
+ val document = cursor.tryNext() ? : return false
129
+ action.accept(mapper.apply (document))
130
+ return true
131
+ } catch (e: Exception ) {
132
+ throw RuntimeException (e)
133
+ }
116
134
}
117
- }
118
- }, false )
135
+ },
136
+ false
137
+ )
119
138
}
120
139
121
140
companion object {
0 commit comments