diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index fa2bd97120c9c..9e379a73d4d03 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -206,7 +206,7 @@ - name: MongoDB destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c dockerRepository: airbyte/destination-mongodb - dockerImageTag: 0.1.8 + dockerImageTag: 0.1.9 documentationUrl: https://docs.airbyte.com/integrations/destinations/mongodb icon: mongodb.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 965bc16c61c3f..6ed75d66248e7 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3338,7 +3338,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-mongodb:0.1.8" +- dockerImage: "airbyte/destination-mongodb:0.1.9" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/mongodb" connectionSpecification: diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index b0ccd752facf8..6f0b8cf5a9c5b 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -6,7 +6,15 @@ import static java.util.Arrays.asList; import static org.bson.BsonType.ARRAY; +import static org.bson.BsonType.DATE_TIME; +import static org.bson.BsonType.DECIMAL128; import static org.bson.BsonType.DOCUMENT; +import static org.bson.BsonType.DOUBLE; +import static org.bson.BsonType.INT32; +import static org.bson.BsonType.INT64; +import static org.bson.BsonType.OBJECT_ID; +import static org.bson.BsonType.STRING; +import static org.bson.BsonType.TIMESTAMP; import static org.bson.codecs.configuration.CodecRegistries.fromProviders; import com.fasterxml.jackson.databind.JsonNode; @@ -20,6 +28,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.DataTypeUtils; +import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.TreeNode; @@ -28,6 +37,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import org.bson.BsonBinary; import org.bson.BsonDateTime; import org.bson.BsonDocument; @@ -53,6 +63,29 @@ public class MongoUtils { private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class); + // Shared constants + public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=admin&ssl=%s"; + public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true"; + public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=admin&directConnection=false&ssl=true"; + public static final String USER = "user"; + public static final String INSTANCE_TYPE = "instance_type"; + public static final String INSTANCE = "instance"; + public static final String CLUSTER_URL = "cluster_url"; + public static final String SERVER_ADDRESSES = "server_addresses"; + public static final String REPLICA_SET = "replica_set"; + + // MongodbDestination specific constants + public static final String AUTH_TYPE = "auth_type"; + public static final String AUTHORIZATION = "authorization"; + public static final String LOGIN_AND_PASSWORD = "login/password"; + public static final String AIRBYTE_DATA_HASH = "_airbyte_data_hash"; + + // MongodbSource specific constants + public static final String AUTH_SOURCE = "auth_source"; + public static final String PRIMARY_KEY = "_id"; + public static final Set ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME, + INT32, TIMESTAMP, INT64, DECIMAL128); + private static final String MISSING_TYPE = "missing"; private static final String NULL_TYPE = "null"; public static final String AIRBYTE_SUFFIX = "_aibyte_transform"; @@ -136,6 +169,14 @@ private static ObjectNode readDocument(final BsonReader reader, final ObjectNode return jsonNodes; } + /** + * Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB. + */ + public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) { + return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean() + : (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true); + } + public static void transformToStringIfMarked(final ObjectNode jsonNodes, final List columnNames, final String fieldName) { if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) { final JsonNode data = jsonNodes.get(fieldName); diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile index 6b4ecd23bc7b7..bb4f3c0cea1f1 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.8 +LABEL io.airbyte.version=0.1.9 LABEL io.airbyte.name=airbyte/destination-mongodb-strict-encrypt diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java index 2cebbbd508bd6..7f9abae396348 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java @@ -4,11 +4,16 @@ package io.airbyte.integrations.destination.mongodb; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.mongodb.MongoUtils; +import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.ConnectorSpecification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,6 +26,17 @@ public MongodbDestinationStrictEncrypt() { super(MongodbDestination.sshWrappedDestination()); } + @Override + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE); + final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText()); + // If the MongoDb destination connector is not set up to use a TLS connection, then check should fail + if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) { + throw new ConfigErrorException("TLS connection must be used to read from MongoDB."); + } + return super.check(config); + } + @Override public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception { final ConnectorSpecification spec = Jsons.clone(originalSpec); diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationStrictEncryptAcceptanceTest.java index 1ce8d1507a4cb..323ea7c2516ff 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationStrictEncryptAcceptanceTest.java @@ -5,13 +5,19 @@ package io.airbyte.integrations.destination.mongodb; import static com.mongodb.client.model.Projections.excludeId; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; +import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.mongodb.client.MongoCursor; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.mongodb.MongoDatabase; +import io.airbyte.db.mongodb.MongoUtils; import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.io.IOException; @@ -21,6 +27,7 @@ import java.util.List; import org.bson.Document; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; public class MongodbDestinationStrictEncryptAcceptanceTest extends DestinationAcceptanceTest { @@ -102,9 +109,27 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return result; } + @Test + void testCheck() throws Exception { + final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("instance", MongoInstanceType.STANDALONE.getType()) + .put("tls", false) + .build()); + + final JsonNode invalidStandaloneConfig = getConfig(); + + ((ObjectNode) invalidStandaloneConfig).put(MongoUtils.INSTANCE_TYPE, instanceConfig); + + final Throwable throwable = catchThrowable(() -> new MongodbDestinationStrictEncrypt().check(invalidStandaloneConfig)); + assertThat(throwable).isInstanceOf(ConfigErrorException.class); + assertThat(((ConfigErrorException) throwable) + .getDisplayMessage() + .contains("TLS connection must be used to read from MongoDB.")); + } + @Override protected void setup(final TestDestinationEnv testEnv) { - var credentials = String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(), + final var credentials = String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(), config.get(AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText()); final String connectionString = String.format("mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true", credentials, diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test/resources/expected_spec.json index 10f94c577c35c..825559049659f 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/test/resources/expected_spec.json @@ -135,6 +135,120 @@ } } ] + }, + "tunnel_method": { + "type": "object", + "title": "SSH Tunnel Method", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", + "oneOf": [ + { + "title": "No Tunnel", + "required": ["tunnel_method"], + "properties": { + "tunnel_method": { + "description": "No ssh tunnel needed to connect to database", + "type": "string", + "const": "NO_TUNNEL", + "order": 0 + } + } + }, + { + "title": "SSH Key Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "ssh_key" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and ssh key", + "type": "string", + "const": "SSH_KEY_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_user": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host.", + "type": "string", + "order": 3 + }, + "ssh_key": { + "title": "SSH Private Key", + "description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )", + "type": "string", + "airbyte_secret": true, + "multiline": true, + "order": 4 + } + } + }, + { + "title": "Password Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "tunnel_user_password" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and password authentication", + "type": "string", + "const": "SSH_PASSWORD_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_user": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host", + "type": "string", + "order": 3 + }, + "tunnel_user_password": { + "title": "Password", + "description": "OS-level password for logging into the jump server host", + "type": "string", + "airbyte_secret": true, + "order": 4 + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/destination-mongodb/Dockerfile b/airbyte-integrations/connectors/destination-mongodb/Dockerfile index cff1b88e848c3..e9dca239d3679 100644 --- a/airbyte-integrations/connectors/destination-mongodb/Dockerfile +++ b/airbyte-integrations/connectors/destination-mongodb/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.8 +LABEL io.airbyte.version=0.1.9 LABEL io.airbyte.name=airbyte/destination-mongodb diff --git a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java index 0a46d2700d901..9f2442ae86704 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java @@ -18,6 +18,7 @@ import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.mongodb.MongoDatabase; +import io.airbyte.db.mongodb.MongoUtils; import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; @@ -47,19 +48,6 @@ public class MongodbDestination extends BaseConnector implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(MongodbDestination.class); - private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=admin&ssl=%s"; - private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true"; - private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=admin&directConnection=false&ssl=true"; - private static final String INSTANCE_TYPE = "instance_type"; - private static final String INSTANCE = "instance"; - private static final String CLUSTER_URL = "cluster_url"; - private static final String SERVER_ADDRESSES = "server_addresses"; - private static final String REPLICA_SET = "replica_set"; - private static final String AUTH_TYPE = "auth_type"; - private static final String AUTHORIZATION = "authorization"; - private static final String LOGIN_AND_PASSWORD = "login/password"; - private static final String AIRBYTE_DATA_HASH = "_airbyte_data_hash"; - private final MongodbNameTransformer namingResolver; public static Destination sshWrappedDestination() { @@ -132,7 +120,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final Set documentsHash = new HashSet<>(); try (final MongoCursor cursor = collection.find().projection(excludeId()).iterator()) { while (cursor.hasNext()) { - documentsHash.add(cursor.next().get(AIRBYTE_DATA_HASH, String.class)); + documentsHash.add(cursor.next().get(MongoUtils.AIRBYTE_DATA_HASH, String.class)); } } @@ -150,18 +138,18 @@ private MongoDatabase getDatabase(final JsonNode config) { @VisibleForTesting String getConnectionString(final JsonNode config) { - final var credentials = config.get(AUTH_TYPE).get(AUTHORIZATION).asText().equals(LOGIN_AND_PASSWORD) - ? String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(), - config.get(AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText()) + final var credentials = config.get(MongoUtils.AUTH_TYPE).get(MongoUtils.AUTHORIZATION).asText().equals(MongoUtils.LOGIN_AND_PASSWORD) + ? String.format("%s:%s@", config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(), + config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText()) : StringUtils.EMPTY; // backward compatibility check // the old mongo db spec only includes host, port, database, and auth_type // the new spec replaces host and port with the instance_type property - if (config.has(INSTANCE_TYPE)) { + if (config.has(MongoUtils.INSTANCE_TYPE)) { return buildConnectionString(config, credentials); } else { - return String.format(MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(), + return String.format(MongoUtils.MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(), config.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), false); } } @@ -169,29 +157,33 @@ String getConnectionString(final JsonNode config) { private String buildConnectionString(final JsonNode config, final String credentials) { final StringBuilder connectionStrBuilder = new StringBuilder(); - final JsonNode instanceConfig = config.get(INSTANCE_TYPE); - final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(INSTANCE).asText()); + final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE); + final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText()); switch (instance) { case STANDALONE -> { // if there is no TLS present in spec, TLS should be enabled by default for strict encryption final var tls = !instanceConfig.has(JdbcUtils.TLS_KEY) || instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean(); connectionStrBuilder.append( - String.format(MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(), + String.format(MongoUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(), instanceConfig.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), tls)); } case REPLICA -> { connectionStrBuilder.append( - String.format(MONGODB_REPLICA_URL, credentials, instanceConfig.get(SERVER_ADDRESSES).asText(), + String.format(MongoUtils.MONGODB_REPLICA_URL, + credentials, + instanceConfig.get(MongoUtils.SERVER_ADDRESSES).asText(), config.get(JdbcUtils.DATABASE_KEY).asText())); - if (instanceConfig.has(REPLICA_SET)) { - connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(REPLICA_SET).asText())); + if (instanceConfig.has(MongoUtils.REPLICA_SET)) { + connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoUtils.REPLICA_SET).asText())); } } case ATLAS -> { connectionStrBuilder.append( - String.format(MONGODB_CLUSTER_URL, credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText())); + String.format(MongoUtils.MONGODB_CLUSTER_URL, credentials, + instanceConfig.get(MongoUtils.CLUSTER_URL).asText(), + config.get(JdbcUtils.DATABASE_KEY).asText())); } default -> throw new IllegalArgumentException("Unsupported instance type: " + instance); } diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.source.mongodb/MongodbSourceStrictEncrypt.java b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.source.mongodb/MongodbSourceStrictEncrypt.java index 4d460be244830..5fb9f8a430741 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.source.mongodb/MongodbSourceStrictEncrypt.java +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.source.mongodb/MongodbSourceStrictEncrypt.java @@ -6,13 +6,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.mongodb.MongoUtils; import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.spec_modification.SpecModifyingSource; import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.ConnectorSpecification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,14 +28,12 @@ public MongodbSourceStrictEncrypt() { @Override public AirbyteConnectionStatus check(final JsonNode config) throws Exception { - final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE); - final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText()); + final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE); + final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText()); // If the MongoDb source connector is not set up to use a TLS connection, then we should fail the // check. - if (instance.equals(MongoInstanceType.STANDALONE) && !MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) { - return new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage("TLS connection must be used to read from MongoDB."); + if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) { + throw new ConfigErrorException("TLS connection must be used to read from MongoDB."); } return super.check(config); diff --git a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java index 3a0516c45c5fc..462d700065728 100644 --- a/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.source.mongodb; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; @@ -11,6 +13,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.mongodb.client.MongoCollection; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.db.jdbc.JdbcUtils; @@ -18,8 +21,6 @@ import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; -import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -160,13 +161,11 @@ void testCheck() throws Exception { ((ObjectNode) invalidStandaloneConfig).put(INSTANCE_TYPE, instanceConfig); - final AirbyteConnectionStatus actual = new MongodbSourceStrictEncrypt().check(invalidStandaloneConfig); - final AirbyteConnectionStatus expected = - new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage("TLS connection must be used to read from MongoDB."); - - assertEquals(expected, actual); + final Throwable throwable = catchThrowable(() -> new MongodbSourceStrictEncrypt().check(invalidStandaloneConfig)); + assertThat(throwable).isInstanceOf(ConfigErrorException.class); + assertThat(((ConfigErrorException) throwable) + .getDisplayMessage() + .contains("TLS connection must be used to read from MongoDB.")); } } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index 7ebfb9d36ef37..7491d47146be7 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -56,8 +56,8 @@ public static void main(final String[] args) throws Exception { @Override public JsonNode toDatabaseConfig(final JsonNode config) { - final var credentials = config.has(MongoDbSourceUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY) - ? String.format("%s:%s@", config.get(MongoDbSourceUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText()) + final var credentials = config.has(MongoUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY) + ? String.format("%s:%s@", config.get(MongoUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText()) : StringUtils.EMPTY; return Jsons.jsonNode(ImmutableMap.builder() @@ -111,7 +111,7 @@ protected List>> discoverInternal(final MongoDat .nameSpace(database.getName()) .name(collectionName) .fields(fields) - .primaryKeys(List.of(MongoDbSourceUtils.PRIMARY_KEY)) + .primaryKeys(List.of(MongoUtils.PRIMARY_KEY)) .build(); tableInfos.add(tableInfo); @@ -192,7 +192,7 @@ public boolean isCursorType(final BsonType bsonType) { // when we have no cursor field here, at least id could be used as cursor here. // This logic will be used feather when we will implement part which will show only list of possible // cursor fields on UI - return MongoDbSourceUtils.ALLOWED_CURSOR_TYPES.contains(bsonType); + return MongoUtils.ALLOWED_CURSOR_TYPES.contains(bsonType); } private AutoCloseableIterator queryTable(final MongoDatabase database, @@ -212,30 +212,30 @@ private AutoCloseableIterator queryTable(final MongoDatabase database, private String buildConnectionString(final JsonNode config, final String credentials) { final StringBuilder connectionStrBuilder = new StringBuilder(); - final JsonNode instanceConfig = config.get(MongoDbSourceUtils.INSTANCE_TYPE); - final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoDbSourceUtils.INSTANCE).asText()); + final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE); + final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText()); switch (instance) { case STANDALONE -> { connectionStrBuilder.append( - String.format(MongoDbSourceUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(), + String.format(MongoUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(), instanceConfig.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), - config.get(MongoDbSourceUtils.AUTH_SOURCE).asText(), MongoDbSourceUtils.tlsEnabledForStandaloneInstance(config, instanceConfig))); + config.get(MongoUtils.AUTH_SOURCE).asText(), MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig))); } case REPLICA -> { connectionStrBuilder.append( - String.format(MongoDbSourceUtils.MONGODB_REPLICA_URL, credentials, instanceConfig.get(MongoDbSourceUtils.SERVER_ADDRESSES).asText(), + String.format(MongoUtils.MONGODB_REPLICA_URL, credentials, instanceConfig.get(MongoUtils.SERVER_ADDRESSES).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), - config.get(MongoDbSourceUtils.AUTH_SOURCE).asText())); - if (instanceConfig.has(MongoDbSourceUtils.REPLICA_SET)) { - connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoDbSourceUtils.REPLICA_SET).asText())); + config.get(MongoUtils.AUTH_SOURCE).asText())); + if (instanceConfig.has(MongoUtils.REPLICA_SET)) { + connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoUtils.REPLICA_SET).asText())); } } case ATLAS -> { connectionStrBuilder.append( - String.format(MongoDbSourceUtils.MONGODB_CLUSTER_URL, credentials, - instanceConfig.get(MongoDbSourceUtils.CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), - config.get(MongoDbSourceUtils.AUTH_SOURCE).asText())); + String.format(MongoUtils.MONGODB_CLUSTER_URL, credentials, + instanceConfig.get(MongoUtils.CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), + config.get(MongoUtils.AUTH_SOURCE).asText())); } default -> throw new IllegalArgumentException("Unsupported instance type: " + instance); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSourceUtils.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSourceUtils.java deleted file mode 100644 index 6939ddf9b0ba0..0000000000000 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSourceUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mongodb; - -import static org.bson.BsonType.DATE_TIME; -import static org.bson.BsonType.DECIMAL128; -import static org.bson.BsonType.DOCUMENT; -import static org.bson.BsonType.DOUBLE; -import static org.bson.BsonType.INT32; -import static org.bson.BsonType.INT64; -import static org.bson.BsonType.OBJECT_ID; -import static org.bson.BsonType.STRING; -import static org.bson.BsonType.TIMESTAMP; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.db.jdbc.JdbcUtils; -import java.util.Set; -import org.bson.BsonType; - -public final class MongoDbSourceUtils { - - private MongoDbSourceUtils() {} - - public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=%s&ssl=%s"; - public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?authSource=%s&retryWrites=true&w=majority&tls=true"; - public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=%s&directConnection=false&ssl=true"; - public static final String USER = "user"; - public static final String INSTANCE_TYPE = "instance_type"; - public static final String INSTANCE = "instance"; - public static final String CLUSTER_URL = "cluster_url"; - public static final String SERVER_ADDRESSES = "server_addresses"; - public static final String REPLICA_SET = "replica_set"; - public static final String AUTH_SOURCE = "auth_source"; - public static final String PRIMARY_KEY = "_id"; - public static final Set ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME, - INT32, TIMESTAMP, INT64, DECIMAL128); - - /** - * Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB. - */ - public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) { - return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean() - : (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true); - } - -} diff --git a/airbyte-webapp/src/core/domain/connector/constants.ts b/airbyte-webapp/src/core/domain/connector/constants.ts index 774b35b9581e1..331bd58f93f96 100644 --- a/airbyte-webapp/src/core/domain/connector/constants.ts +++ b/airbyte-webapp/src/core/domain/connector/constants.ts @@ -22,7 +22,6 @@ export const getExcludedConnectorIds = (workspaceId: string) => "8ccd8909-4e99-4141-b48d-4984b70b2d89", // hide DynamoDB Destination https://github.com/airbytehq/airbyte-cloud/issues/2608 "9f760101-60ae-462f-9ee6-b7a9dafd454d", // hide Kafka Destination https://github.com/airbytehq/airbyte-cloud/issues/2610 "294a4790-429b-40ae-9516-49826b9702e1", // hide MariaDB Destination https://github.com/airbytehq/airbyte-cloud/issues/2611 - "8b746512-8c2e-6ac1-4adc-b59faafd473c", // hide MongoDB Destination https://github.com/airbytehq/airbyte-cloud/issues/2612 "f3802bc4-5406-4752-9e8d-01e504ca8194", // hide MQTT Destination https://github.com/airbytehq/airbyte-cloud/issues/2613 "2340cbba-358e-11ec-8d3d-0242ac130203", // hide Pular Destination https://github.com/airbytehq/airbyte-cloud/issues/2614 "2c9d93a7-9a17-4789-9de9-f46f0097eb70", // hide Rockset Destination https://github.com/airbytehq/airbyte-cloud/issues/2615 diff --git a/docs/integrations/destinations/mongodb.md b/docs/integrations/destinations/mongodb.md index 6a357aa6215bb..ea44a2f1f5f70 100644 --- a/docs/integrations/destinations/mongodb.md +++ b/docs/integrations/destinations/mongodb.md @@ -117,6 +117,7 @@ Collection names should begin with an underscore or a letter character, and cann | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------| +| 0.1.9 | 2022-11-08 | [18892](https://github.com/airbytehq/airbyte/pull/18892) | Adds check for TLS flag | | 0.1.8 | 2022-10-26 | [18280](https://github.com/airbytehq/airbyte/pull/18280) | Adds SSH tunneling | | 0.1.7 | 2022-09-02 | [16025](https://github.com/airbytehq/airbyte/pull/16025) | Remove additionalProperties:false from spec | | 0.1.6 | 2022-08-02 | [15211](https://github.com/airbytehq/airbyte/pull/15211) | Fix standard mode |