Skip to content

Commit 08af15d

Browse files
committed
Migrated MongoDbSourceUitls to MongoUtils and fixed expected_json to include ssh tunnel spec
1 parent abe283b commit 08af15d

File tree

9 files changed

+196
-118
lines changed

9 files changed

+196
-118
lines changed

airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,15 @@
66

77
import static java.util.Arrays.asList;
88
import static org.bson.BsonType.ARRAY;
9+
import static org.bson.BsonType.DATE_TIME;
10+
import static org.bson.BsonType.DECIMAL128;
911
import static org.bson.BsonType.DOCUMENT;
12+
import static org.bson.BsonType.DOUBLE;
13+
import static org.bson.BsonType.INT32;
14+
import static org.bson.BsonType.INT64;
15+
import static org.bson.BsonType.OBJECT_ID;
16+
import static org.bson.BsonType.STRING;
17+
import static org.bson.BsonType.TIMESTAMP;
1018
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
1119

1220
import com.fasterxml.jackson.databind.JsonNode;
@@ -20,6 +28,7 @@
2028
import io.airbyte.commons.json.Jsons;
2129
import io.airbyte.commons.util.MoreIterators;
2230
import io.airbyte.db.DataTypeUtils;
31+
import io.airbyte.db.jdbc.JdbcUtils;
2332
import io.airbyte.protocol.models.CommonField;
2433
import io.airbyte.protocol.models.JsonSchemaType;
2534
import io.airbyte.protocol.models.TreeNode;
@@ -28,6 +37,7 @@
2837
import java.util.Collections;
2938
import java.util.HashSet;
3039
import java.util.List;
40+
import java.util.Set;
3141
import org.bson.BsonBinary;
3242
import org.bson.BsonDateTime;
3343
import org.bson.BsonDocument;
@@ -53,6 +63,29 @@ public class MongoUtils {
5363

5464
private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class);
5565

66+
// Shared constants
67+
public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=admin&ssl=%s";
68+
public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true";
69+
public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=admin&directConnection=false&ssl=true";
70+
public static final String USER = "user";
71+
public static final String INSTANCE_TYPE = "instance_type";
72+
public static final String INSTANCE = "instance";
73+
public static final String CLUSTER_URL = "cluster_url";
74+
public static final String SERVER_ADDRESSES = "server_addresses";
75+
public static final String REPLICA_SET = "replica_set";
76+
77+
// MongodbDestination specific constants
78+
public static final String AUTH_TYPE = "auth_type";
79+
public static final String AUTHORIZATION = "authorization";
80+
public static final String LOGIN_AND_PASSWORD = "login/password";
81+
public static final String AIRBYTE_DATA_HASH = "_airbyte_data_hash";
82+
83+
// MongodbSource specific constants
84+
public static final String AUTH_SOURCE = "auth_source";
85+
public static final String PRIMARY_KEY = "_id";
86+
public static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
87+
INT32, TIMESTAMP, INT64, DECIMAL128);
88+
5689
private static final String MISSING_TYPE = "missing";
5790
private static final String NULL_TYPE = "null";
5891
public static final String AIRBYTE_SUFFIX = "_aibyte_transform";
@@ -136,6 +169,14 @@ private static ObjectNode readDocument(final BsonReader reader, final ObjectNode
136169
return jsonNodes;
137170
}
138171

172+
/**
173+
* Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB.
174+
*/
175+
public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) {
176+
return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
177+
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
178+
}
179+
139180
public static void transformToStringIfMarked(final ObjectNode jsonNodes, final List<String> columnNames, final String fieldName) {
140181
if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
141182
final JsonNode data = jsonNodes.get(fieldName);

airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import io.airbyte.commons.json.Jsons;
10+
import io.airbyte.db.mongodb.MongoUtils;
1011
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
1112
import io.airbyte.integrations.base.Destination;
1213
import io.airbyte.integrations.base.IntegrationRunner;
@@ -27,10 +28,10 @@ public MongodbDestinationStrictEncrypt() {
2728

2829
@Override
2930
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
30-
final JsonNode instanceConfig = config.get(MongoDbDestinationUtils.INSTANCE_TYPE);
31-
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbDestinationUtils.INSTANCE).asText());
31+
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
32+
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
3233
// If the MongoDb destination connector is not set up to use a TLS connection, then check should fail
33-
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoDbDestinationUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
34+
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
3435
return new AirbyteConnectionStatus()
3536
.withStatus(Status.FAILED)
3637
.withMessage("TLS connection must be used to read from MongoDB.");

airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationStrictEncryptAcceptanceTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.airbyte.commons.json.Jsons;
1515
import io.airbyte.db.jdbc.JdbcUtils;
1616
import io.airbyte.db.mongodb.MongoDatabase;
17+
import io.airbyte.db.mongodb.MongoUtils;
1718
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
1819
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
1920
import io.airbyte.protocol.models.AirbyteConnectionStatus;
@@ -116,7 +117,7 @@ void testCheck() throws Exception {
116117

117118
final JsonNode invalidStandaloneConfig = getConfig();
118119

119-
((ObjectNode) invalidStandaloneConfig).put(MongoDbDestinationUtils.INSTANCE_TYPE, instanceConfig);
120+
((ObjectNode) invalidStandaloneConfig).put(MongoUtils.INSTANCE_TYPE, instanceConfig);
120121

121122
final AirbyteConnectionStatus actual = new MongodbDestinationStrictEncrypt().check(invalidStandaloneConfig);
122123
final AirbyteConnectionStatus expected =

airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test/resources/expected_spec.json

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,120 @@
135135
}
136136
}
137137
]
138+
},
139+
"tunnel_method": {
140+
"type": "object",
141+
"title": "SSH Tunnel Method",
142+
"description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.",
143+
"oneOf": [
144+
{
145+
"title": "No Tunnel",
146+
"required": ["tunnel_method"],
147+
"properties": {
148+
"tunnel_method": {
149+
"description": "No ssh tunnel needed to connect to database",
150+
"type": "string",
151+
"const": "NO_TUNNEL",
152+
"order": 0
153+
}
154+
}
155+
},
156+
{
157+
"title": "SSH Key Authentication",
158+
"required": [
159+
"tunnel_method",
160+
"tunnel_host",
161+
"tunnel_port",
162+
"tunnel_user",
163+
"ssh_key"
164+
],
165+
"properties": {
166+
"tunnel_method": {
167+
"description": "Connect through a jump server tunnel host using username and ssh key",
168+
"type": "string",
169+
"const": "SSH_KEY_AUTH",
170+
"order": 0
171+
},
172+
"tunnel_host": {
173+
"title": "SSH Tunnel Jump Server Host",
174+
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
175+
"type": "string",
176+
"order": 1
177+
},
178+
"tunnel_port": {
179+
"title": "SSH Connection Port",
180+
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
181+
"type": "integer",
182+
"minimum": 0,
183+
"maximum": 65536,
184+
"default": 22,
185+
"examples": ["22"],
186+
"order": 2
187+
},
188+
"tunnel_user": {
189+
"title": "SSH Login Username",
190+
"description": "OS-level username for logging into the jump server host.",
191+
"type": "string",
192+
"order": 3
193+
},
194+
"ssh_key": {
195+
"title": "SSH Private Key",
196+
"description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )",
197+
"type": "string",
198+
"airbyte_secret": true,
199+
"multiline": true,
200+
"order": 4
201+
}
202+
}
203+
},
204+
{
205+
"title": "Password Authentication",
206+
"required": [
207+
"tunnel_method",
208+
"tunnel_host",
209+
"tunnel_port",
210+
"tunnel_user",
211+
"tunnel_user_password"
212+
],
213+
"properties": {
214+
"tunnel_method": {
215+
"description": "Connect through a jump server tunnel host using username and password authentication",
216+
"type": "string",
217+
"const": "SSH_PASSWORD_AUTH",
218+
"order": 0
219+
},
220+
"tunnel_host": {
221+
"title": "SSH Tunnel Jump Server Host",
222+
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
223+
"type": "string",
224+
"order": 1
225+
},
226+
"tunnel_port": {
227+
"title": "SSH Connection Port",
228+
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
229+
"type": "integer",
230+
"minimum": 0,
231+
"maximum": 65536,
232+
"default": 22,
233+
"examples": ["22"],
234+
"order": 2
235+
},
236+
"tunnel_user": {
237+
"title": "SSH Login Username",
238+
"description": "OS-level username for logging into the jump server host",
239+
"type": "string",
240+
"order": 3
241+
},
242+
"tunnel_user_password": {
243+
"title": "Password",
244+
"description": "OS-level password for logging into the jump server host",
245+
"type": "string",
246+
"airbyte_secret": true,
247+
"order": 4
248+
}
249+
}
250+
}
251+
]
138252
}
139253
}
140254
}

airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongoDbDestinationUtils.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.airbyte.commons.util.MoreIterators;
1919
import io.airbyte.db.jdbc.JdbcUtils;
2020
import io.airbyte.db.mongodb.MongoDatabase;
21+
import io.airbyte.db.mongodb.MongoUtils;
2122
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
2223
import io.airbyte.integrations.BaseConnector;
2324
import io.airbyte.integrations.base.AirbyteMessageConsumer;
@@ -119,7 +120,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
119120
final Set<String> documentsHash = new HashSet<>();
120121
try (final MongoCursor<Document> cursor = collection.find().projection(excludeId()).iterator()) {
121122
while (cursor.hasNext()) {
122-
documentsHash.add(cursor.next().get(MongoDbDestinationUtils.AIRBYTE_DATA_HASH, String.class));
123+
documentsHash.add(cursor.next().get(MongoUtils.AIRBYTE_DATA_HASH, String.class));
123124
}
124125
}
125126

@@ -137,51 +138,51 @@ private MongoDatabase getDatabase(final JsonNode config) {
137138

138139
@VisibleForTesting
139140
String getConnectionString(final JsonNode config) {
140-
final var credentials = config.get(MongoDbDestinationUtils.AUTH_TYPE).get(MongoDbDestinationUtils.AUTHORIZATION).asText().equals(MongoDbDestinationUtils.LOGIN_AND_PASSWORD)
141-
? String.format("%s:%s@", config.get(MongoDbDestinationUtils.AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
142-
config.get(MongoDbDestinationUtils.AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText())
141+
final var credentials = config.get(MongoUtils.AUTH_TYPE).get(MongoUtils.AUTHORIZATION).asText().equals(MongoUtils.LOGIN_AND_PASSWORD)
142+
? String.format("%s:%s@", config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
143+
config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText())
143144
: StringUtils.EMPTY;
144145

145146
// backward compatibility check
146147
// the old mongo db spec only includes host, port, database, and auth_type
147148
// the new spec replaces host and port with the instance_type property
148-
if (config.has(MongoDbDestinationUtils.INSTANCE_TYPE)) {
149+
if (config.has(MongoUtils.INSTANCE_TYPE)) {
149150
return buildConnectionString(config, credentials);
150151
} else {
151-
return String.format(MongoDbDestinationUtils.MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(),
152+
return String.format(MongoUtils.MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(),
152153
config.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), false);
153154
}
154155
}
155156

156157
private String buildConnectionString(final JsonNode config, final String credentials) {
157158
final StringBuilder connectionStrBuilder = new StringBuilder();
158159

159-
final JsonNode instanceConfig = config.get(MongoDbDestinationUtils.INSTANCE_TYPE);
160-
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbDestinationUtils.INSTANCE).asText());
160+
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
161+
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
161162

162163
switch (instance) {
163164
case STANDALONE -> {
164165
// if there is no TLS present in spec, TLS should be enabled by default for strict encryption
165166
final var tls = !instanceConfig.has(JdbcUtils.TLS_KEY) || instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean();
166167
connectionStrBuilder.append(
167-
String.format(MongoDbDestinationUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
168+
String.format(MongoUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
168169
instanceConfig.get(JdbcUtils.PORT_KEY).asText(),
169170
config.get(JdbcUtils.DATABASE_KEY).asText(), tls));
170171
}
171172
case REPLICA -> {
172173
connectionStrBuilder.append(
173-
String.format(MongoDbDestinationUtils.MONGODB_REPLICA_URL,
174+
String.format(MongoUtils.MONGODB_REPLICA_URL,
174175
credentials,
175-
instanceConfig.get(MongoDbDestinationUtils.SERVER_ADDRESSES).asText(),
176+
instanceConfig.get(MongoUtils.SERVER_ADDRESSES).asText(),
176177
config.get(JdbcUtils.DATABASE_KEY).asText()));
177-
if (instanceConfig.has(MongoDbDestinationUtils.REPLICA_SET)) {
178-
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoDbDestinationUtils.REPLICA_SET).asText()));
178+
if (instanceConfig.has(MongoUtils.REPLICA_SET)) {
179+
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoUtils.REPLICA_SET).asText()));
179180
}
180181
}
181182
case ATLAS -> {
182183
connectionStrBuilder.append(
183-
String.format(MongoDbDestinationUtils.MONGODB_CLUSTER_URL, credentials,
184-
instanceConfig.get(MongoDbDestinationUtils.CLUSTER_URL).asText(),
184+
String.format(MongoUtils.MONGODB_CLUSTER_URL, credentials,
185+
instanceConfig.get(MongoUtils.CLUSTER_URL).asText(),
185186
config.get(JdbcUtils.DATABASE_KEY).asText()));
186187
}
187188
default -> throw new IllegalArgumentException("Unsupported instance type: " + instance);

airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.source.mongodb/MongodbSourceStrictEncrypt.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import io.airbyte.commons.json.Jsons;
10+
import io.airbyte.db.mongodb.MongoUtils;
1011
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
1112
import io.airbyte.integrations.base.IntegrationRunner;
1213
import io.airbyte.integrations.base.Source;
@@ -27,11 +28,11 @@ public MongodbSourceStrictEncrypt() {
2728

2829
@Override
2930
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
30-
final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE);
31-
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText());
31+
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
32+
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
3233
// If the MongoDb source connector is not set up to use a TLS connection, then we should fail the
3334
// check.
34-
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
35+
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
3536
return new AirbyteConnectionStatus()
3637
.withStatus(Status.FAILED)
3738
.withMessage("TLS connection must be used to read from MongoDB.");

0 commit comments

Comments
 (0)