Skip to content

Commit 2b1f9fe

Browse files
committed
Migrates MongodbSourceUitls to general purprose Utils file
1 parent 09dee5e commit 2b1f9fe

File tree

9 files changed

+102
-144
lines changed

9 files changed

+102
-144
lines changed

airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,15 @@
66

77
import static java.util.Arrays.asList;
88
import static org.bson.BsonType.ARRAY;
9+
import static org.bson.BsonType.DATE_TIME;
10+
import static org.bson.BsonType.DECIMAL128;
911
import static org.bson.BsonType.DOCUMENT;
12+
import static org.bson.BsonType.DOUBLE;
13+
import static org.bson.BsonType.INT32;
14+
import static org.bson.BsonType.INT64;
15+
import static org.bson.BsonType.OBJECT_ID;
16+
import static org.bson.BsonType.STRING;
17+
import static org.bson.BsonType.TIMESTAMP;
1018
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
1119

1220
import com.fasterxml.jackson.databind.JsonNode;
@@ -20,6 +28,7 @@
2028
import io.airbyte.commons.json.Jsons;
2129
import io.airbyte.commons.util.MoreIterators;
2230
import io.airbyte.db.DataTypeUtils;
31+
import io.airbyte.db.jdbc.JdbcUtils;
2332
import io.airbyte.protocol.models.CommonField;
2433
import io.airbyte.protocol.models.JsonSchemaType;
2534
import io.airbyte.protocol.models.TreeNode;
@@ -28,6 +37,7 @@
2837
import java.util.Collections;
2938
import java.util.HashSet;
3039
import java.util.List;
40+
import java.util.Set;
3141
import org.bson.BsonBinary;
3242
import org.bson.BsonDateTime;
3343
import org.bson.BsonDocument;
@@ -53,6 +63,29 @@ public class MongoUtils {
5363

5464
private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class);
5565

66+
// Shared constants
67+
public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=admin&ssl=%s";
68+
public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true";
69+
public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=admin&directConnection=false&ssl=true";
70+
public static final String USER = "user";
71+
public static final String INSTANCE_TYPE = "instance_type";
72+
public static final String INSTANCE = "instance";
73+
public static final String CLUSTER_URL = "cluster_url";
74+
public static final String SERVER_ADDRESSES = "server_addresses";
75+
public static final String REPLICA_SET = "replica_set";
76+
77+
// MongodbDestination specific constants
78+
public static final String AUTH_TYPE = "auth_type";
79+
public static final String AUTHORIZATION = "authorization";
80+
public static final String LOGIN_AND_PASSWORD = "login/password";
81+
public static final String AIRBYTE_DATA_HASH = "_airbyte_data_hash";
82+
83+
// MongodbSource specific constants
84+
public static final String AUTH_SOURCE = "auth_source";
85+
public static final String PRIMARY_KEY = "_id";
86+
public static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
87+
INT32, TIMESTAMP, INT64, DECIMAL128);
88+
5689
private static final String MISSING_TYPE = "missing";
5790
private static final String NULL_TYPE = "null";
5891
public static final String AIRBYTE_SUFFIX = "_aibyte_transform";
@@ -136,6 +169,14 @@ private static ObjectNode readDocument(final BsonReader reader, final ObjectNode
136169
return jsonNodes;
137170
}
138171

172+
/**
173+
* Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB.
174+
*/
175+
public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) {
176+
return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
177+
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
178+
}
179+
139180
public static void transformToStringIfMarked(final ObjectNode jsonNodes, final List<String> columnNames, final String fieldName) {
140181
if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
141182
final JsonNode data = jsonNodes.get(fieldName);

airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.fasterxml.jackson.databind.node.ObjectNode;
9+
import io.airbyte.commons.exceptions.ConfigErrorException;
910
import io.airbyte.commons.json.Jsons;
11+
import io.airbyte.db.mongodb.MongoUtils;
1012
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
1113
import io.airbyte.integrations.base.Destination;
1214
import io.airbyte.integrations.base.IntegrationRunner;
1315
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
1416
import io.airbyte.protocol.models.AirbyteConnectionStatus;
15-
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
1617
import io.airbyte.protocol.models.ConnectorSpecification;
1718
import org.slf4j.Logger;
1819
import org.slf4j.LoggerFactory;
@@ -27,13 +28,11 @@ public MongodbDestinationStrictEncrypt() {
2728

2829
@Override
2930
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
30-
final JsonNode instanceConfig = config.get(MongoDbDestinationUtils.INSTANCE_TYPE);
31-
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbDestinationUtils.INSTANCE).asText());
31+
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
32+
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
3233
// If the MongoDb destination connector is not set up to use a TLS connection, then check should fail
33-
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoDbDestinationUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
34-
return new AirbyteConnectionStatus()
35-
.withStatus(Status.FAILED)
36-
.withMessage("TLS connection must be used to read from MongoDB.");
34+
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
35+
throw new ConfigErrorException("TLS connection must be used to read from MongoDB.");
3736
}
3837
return super.check(config);
3938
}

airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationStrictEncryptAcceptanceTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,21 @@
55
package io.airbyte.integrations.destination.mongodb;
66

77
import static com.mongodb.client.model.Projections.excludeId;
8+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
9+
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
810
import static org.junit.jupiter.api.Assertions.assertEquals;
911

1012
import com.fasterxml.jackson.databind.JsonNode;
1113
import com.fasterxml.jackson.databind.node.ObjectNode;
1214
import com.google.common.collect.ImmutableMap;
1315
import com.mongodb.client.MongoCursor;
16+
import io.airbyte.commons.exceptions.ConfigErrorException;
1417
import io.airbyte.commons.json.Jsons;
1518
import io.airbyte.db.jdbc.JdbcUtils;
1619
import io.airbyte.db.mongodb.MongoDatabase;
20+
import io.airbyte.db.mongodb.MongoUtils;
1721
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
1822
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
19-
import io.airbyte.protocol.models.AirbyteConnectionStatus;
20-
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
2123
import java.io.IOException;
2224
import java.nio.file.Files;
2325
import java.nio.file.Path;
@@ -116,15 +118,13 @@ void testCheck() throws Exception {
116118

117119
final JsonNode invalidStandaloneConfig = getConfig();
118120

119-
((ObjectNode) invalidStandaloneConfig).put(MongoDbDestinationUtils.INSTANCE_TYPE, instanceConfig);
121+
((ObjectNode) invalidStandaloneConfig).put(MongoUtils.INSTANCE_TYPE, instanceConfig);
120122

121-
final AirbyteConnectionStatus actual = new MongodbDestinationStrictEncrypt().check(invalidStandaloneConfig);
122-
final AirbyteConnectionStatus expected =
123-
new AirbyteConnectionStatus()
124-
.withStatus(Status.FAILED)
125-
.withMessage("TLS connection must be used to read from MongoDB.");
126-
127-
assertEquals(expected, actual);
123+
final Throwable throwable = catchThrowable(() -> new MongodbDestinationStrictEncrypt().check(invalidStandaloneConfig));
124+
assertThat(throwable).isInstanceOf(ConfigErrorException.class);
125+
assertThat(((ConfigErrorException) throwable)
126+
.getDisplayMessage()
127+
.contains("TLS connection must be used to read from MongoDB."));
128128
}
129129

130130
@Override

airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongoDbDestinationUtils.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.airbyte.commons.util.MoreIterators;
1919
import io.airbyte.db.jdbc.JdbcUtils;
2020
import io.airbyte.db.mongodb.MongoDatabase;
21+
import io.airbyte.db.mongodb.MongoUtils;
2122
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
2223
import io.airbyte.integrations.BaseConnector;
2324
import io.airbyte.integrations.base.AirbyteMessageConsumer;
@@ -119,7 +120,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
119120
final Set<String> documentsHash = new HashSet<>();
120121
try (final MongoCursor<Document> cursor = collection.find().projection(excludeId()).iterator()) {
121122
while (cursor.hasNext()) {
122-
documentsHash.add(cursor.next().get(MongoDbDestinationUtils.AIRBYTE_DATA_HASH, String.class));
123+
documentsHash.add(cursor.next().get(MongoUtils.AIRBYTE_DATA_HASH, String.class));
123124
}
124125
}
125126

@@ -137,51 +138,51 @@ private MongoDatabase getDatabase(final JsonNode config) {
137138

138139
@VisibleForTesting
139140
String getConnectionString(final JsonNode config) {
140-
final var credentials = config.get(MongoDbDestinationUtils.AUTH_TYPE).get(MongoDbDestinationUtils.AUTHORIZATION).asText().equals(MongoDbDestinationUtils.LOGIN_AND_PASSWORD)
141-
? String.format("%s:%s@", config.get(MongoDbDestinationUtils.AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
142-
config.get(MongoDbDestinationUtils.AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText())
141+
final var credentials = config.get(MongoUtils.AUTH_TYPE).get(MongoUtils.AUTHORIZATION).asText().equals(MongoUtils.LOGIN_AND_PASSWORD)
142+
? String.format("%s:%s@", config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
143+
config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText())
143144
: StringUtils.EMPTY;
144145

145146
// backward compatibility check
146147
// the old mongo db spec only includes host, port, database, and auth_type
147148
// the new spec replaces host and port with the instance_type property
148-
if (config.has(MongoDbDestinationUtils.INSTANCE_TYPE)) {
149+
if (config.has(MongoUtils.INSTANCE_TYPE)) {
149150
return buildConnectionString(config, credentials);
150151
} else {
151-
return String.format(MongoDbDestinationUtils.MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(),
152+
return String.format(MongoUtils.MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(),
152153
config.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), false);
153154
}
154155
}
155156

156157
private String buildConnectionString(final JsonNode config, final String credentials) {
157158
final StringBuilder connectionStrBuilder = new StringBuilder();
158159

159-
final JsonNode instanceConfig = config.get(MongoDbDestinationUtils.INSTANCE_TYPE);
160-
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbDestinationUtils.INSTANCE).asText());
160+
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
161+
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
161162

162163
switch (instance) {
163164
case STANDALONE -> {
164165
// if there is no TLS present in spec, TLS should be enabled by default for strict encryption
165166
final var tls = !instanceConfig.has(JdbcUtils.TLS_KEY) || instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean();
166167
connectionStrBuilder.append(
167-
String.format(MongoDbDestinationUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
168+
String.format(MongoUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
168169
instanceConfig.get(JdbcUtils.PORT_KEY).asText(),
169170
config.get(JdbcUtils.DATABASE_KEY).asText(), tls));
170171
}
171172
case REPLICA -> {
172173
connectionStrBuilder.append(
173-
String.format(MongoDbDestinationUtils.MONGODB_REPLICA_URL,
174+
String.format(MongoUtils.MONGODB_REPLICA_URL,
174175
credentials,
175-
instanceConfig.get(MongoDbDestinationUtils.SERVER_ADDRESSES).asText(),
176+
instanceConfig.get(MongoUtils.SERVER_ADDRESSES).asText(),
176177
config.get(JdbcUtils.DATABASE_KEY).asText()));
177-
if (instanceConfig.has(MongoDbDestinationUtils.REPLICA_SET)) {
178-
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoDbDestinationUtils.REPLICA_SET).asText()));
178+
if (instanceConfig.has(MongoUtils.REPLICA_SET)) {
179+
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoUtils.REPLICA_SET).asText()));
179180
}
180181
}
181182
case ATLAS -> {
182183
connectionStrBuilder.append(
183-
String.format(MongoDbDestinationUtils.MONGODB_CLUSTER_URL, credentials,
184-
instanceConfig.get(MongoDbDestinationUtils.CLUSTER_URL).asText(),
184+
String.format(MongoUtils.MONGODB_CLUSTER_URL, credentials,
185+
instanceConfig.get(MongoUtils.CLUSTER_URL).asText(),
185186
config.get(JdbcUtils.DATABASE_KEY).asText()));
186187
}
187188
default -> throw new IllegalArgumentException("Unsupported instance type: " + instance);

airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.source.mongodb/MongodbSourceStrictEncrypt.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.fasterxml.jackson.databind.node.ObjectNode;
9+
import io.airbyte.commons.exceptions.ConfigErrorException;
910
import io.airbyte.commons.json.Jsons;
11+
import io.airbyte.db.mongodb.MongoUtils;
1012
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
1113
import io.airbyte.integrations.base.IntegrationRunner;
1214
import io.airbyte.integrations.base.Source;
1315
import io.airbyte.integrations.base.spec_modification.SpecModifyingSource;
1416
import io.airbyte.protocol.models.AirbyteConnectionStatus;
15-
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
1617
import io.airbyte.protocol.models.ConnectorSpecification;
1718
import org.slf4j.Logger;
1819
import org.slf4j.LoggerFactory;
@@ -27,14 +28,12 @@ public MongodbSourceStrictEncrypt() {
2728

2829
@Override
2930
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
30-
final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE);
31-
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText());
31+
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
32+
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
3233
// If the MongoDb source connector is not set up to use a TLS connection, then we should fail the
3334
// check.
34-
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
35-
return new AirbyteConnectionStatus()
36-
.withStatus(Status.FAILED)
37-
.withMessage("TLS connection must be used to read from MongoDB.");
35+
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
36+
throw new ConfigErrorException("TLS connection must be used to read from MongoDB.");
3837
}
3938

4039
return super.check(config);

airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,23 @@
44

55
package io.airbyte.integrations.source.mongodb;
66

7+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
8+
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
79
import static org.junit.jupiter.api.Assertions.assertEquals;
810

911
import com.fasterxml.jackson.databind.JsonNode;
1012
import com.fasterxml.jackson.databind.node.ObjectNode;
1113
import com.google.common.collect.ImmutableMap;
1214
import com.google.common.collect.Lists;
1315
import com.mongodb.client.MongoCollection;
16+
import io.airbyte.commons.exceptions.ConfigErrorException;
1417
import io.airbyte.commons.json.Jsons;
1518
import io.airbyte.commons.resources.MoreResources;
1619
import io.airbyte.db.jdbc.JdbcUtils;
1720
import io.airbyte.db.mongodb.MongoDatabase;
1821
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
1922
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
2023
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
21-
import io.airbyte.protocol.models.AirbyteConnectionStatus;
22-
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
2324
import io.airbyte.protocol.models.CatalogHelpers;
2425
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
2526
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
@@ -160,13 +161,11 @@ void testCheck() throws Exception {
160161

161162
((ObjectNode) invalidStandaloneConfig).put(INSTANCE_TYPE, instanceConfig);
162163

163-
final AirbyteConnectionStatus actual = new MongodbSourceStrictEncrypt().check(invalidStandaloneConfig);
164-
final AirbyteConnectionStatus expected =
165-
new AirbyteConnectionStatus()
166-
.withStatus(Status.FAILED)
167-
.withMessage("TLS connection must be used to read from MongoDB.");
168-
169-
assertEquals(expected, actual);
164+
final Throwable throwable = catchThrowable(() -> new MongodbSourceStrictEncrypt().check(invalidStandaloneConfig));
165+
assertThat(throwable).isInstanceOf(ConfigErrorException.class);
166+
assertThat(((ConfigErrorException) throwable)
167+
.getDisplayMessage()
168+
.contains("TLS connection must be used to read from MongoDB."));
170169
}
171170

172171
}

0 commit comments

Comments
 (0)