Skip to content

Adds TLS check to mongodb destination and migrates util constants #18892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@

import static java.util.Arrays.asList;
import static org.bson.BsonType.ARRAY;
import static org.bson.BsonType.DATE_TIME;
import static org.bson.BsonType.DECIMAL128;
import static org.bson.BsonType.DOCUMENT;
import static org.bson.BsonType.DOUBLE;
import static org.bson.BsonType.INT32;
import static org.bson.BsonType.INT64;
import static org.bson.BsonType.OBJECT_ID;
import static org.bson.BsonType.STRING;
import static org.bson.BsonType.TIMESTAMP;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -20,6 +28,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.TreeNode;
Expand All @@ -28,6 +37,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.bson.BsonBinary;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
Expand All @@ -53,6 +63,29 @@ public class MongoUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class);

// Shared constants
public static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=admin&ssl=%s";
public static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true";
public static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=admin&directConnection=false&ssl=true";
public static final String USER = "user";
public static final String INSTANCE_TYPE = "instance_type";
public static final String INSTANCE = "instance";
public static final String CLUSTER_URL = "cluster_url";
public static final String SERVER_ADDRESSES = "server_addresses";
public static final String REPLICA_SET = "replica_set";

// MongodbDestination specific constants
public static final String AUTH_TYPE = "auth_type";
public static final String AUTHORIZATION = "authorization";
public static final String LOGIN_AND_PASSWORD = "login/password";
public static final String AIRBYTE_DATA_HASH = "_airbyte_data_hash";

// MongodbSource specific constants
public static final String AUTH_SOURCE = "auth_source";
public static final String PRIMARY_KEY = "_id";
public static final Set<BsonType> ALLOWED_CURSOR_TYPES = Set.of(DOUBLE, STRING, DOCUMENT, OBJECT_ID, DATE_TIME,
INT32, TIMESTAMP, INT64, DECIMAL128);

private static final String MISSING_TYPE = "missing";
private static final String NULL_TYPE = "null";
public static final String AIRBYTE_SUFFIX = "_aibyte_transform";
Expand Down Expand Up @@ -136,6 +169,14 @@ private static ObjectNode readDocument(final BsonReader reader, final ObjectNode
return jsonNodes;
}

/**
* Determines whether TLS/SSL should be enabled for a standalone instance of MongoDB.
*/
public static boolean tlsEnabledForStandaloneInstance(final JsonNode config, final JsonNode instanceConfig) {
return config.has(JdbcUtils.TLS_KEY) ? config.get(JdbcUtils.TLS_KEY).asBoolean()
: (instanceConfig.has(JdbcUtils.TLS_KEY) ? instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean() : true);
}

public static void transformToStringIfMarked(final ObjectNode jsonNodes, final List<String> columnNames, final String fieldName) {
if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
final JsonNode data = jsonNodes.get(fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

package io.airbyte.integrations.destination.mongodb;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,6 +26,17 @@ public MongodbDestinationStrictEncrypt() {
super(MongodbDestination.sshWrappedDestination());
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());
// If the MongoDb destination connector is not set up to use a TLS connection, then check should fail
if (instance.equals(MongoInstanceType.STANDALONE) && !MongoUtils.tlsEnabledForStandaloneInstance(config, instanceConfig)) {
throw new ConfigErrorException("TLS connection must be used to read from MongoDB.");
}
return super.check(config);
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
package io.airbyte.integrations.destination.mongodb;

import static com.mongodb.client.model.Projections.excludeId;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.io.IOException;
Expand All @@ -21,6 +27,7 @@
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class MongodbDestinationStrictEncryptAcceptanceTest extends DestinationAcceptanceTest {

Expand Down Expand Up @@ -102,9 +109,27 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return result;
}

@Test
void testCheck() throws Exception {
final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("instance", MongoInstanceType.STANDALONE.getType())
.put("tls", false)
.build());

final JsonNode invalidStandaloneConfig = getConfig();

((ObjectNode) invalidStandaloneConfig).put(MongoUtils.INSTANCE_TYPE, instanceConfig);

final Throwable throwable = catchThrowable(() -> new MongodbDestinationStrictEncrypt().check(invalidStandaloneConfig));
assertThat(throwable).isInstanceOf(ConfigErrorException.class);
assertThat(((ConfigErrorException) throwable)
.getDisplayMessage()
.contains("TLS connection must be used to read from MongoDB."));
}

@Override
protected void setup(final TestDestinationEnv testEnv) {
var credentials = String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
final var credentials = String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
config.get(AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText());
final String connectionString = String.format("mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true",
credentials,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,120 @@
}
}
]
},
"tunnel_method": {
"type": "object",
"title": "SSH Tunnel Method",
"description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.",
"oneOf": [
{
"title": "No Tunnel",
"required": ["tunnel_method"],
"properties": {
"tunnel_method": {
"description": "No ssh tunnel needed to connect to database",
"type": "string",
"const": "NO_TUNNEL",
"order": 0
}
}
},
{
"title": "SSH Key Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"ssh_key"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and ssh key",
"type": "string",
"const": "SSH_KEY_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host.",
"type": "string",
"order": 3
},
"ssh_key": {
"title": "SSH Private Key",
"description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )",
"type": "string",
"airbyte_secret": true,
"multiline": true,
"order": 4
}
}
},
{
"title": "Password Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"tunnel_user_password"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and password authentication",
"type": "string",
"const": "SSH_PASSWORD_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host",
"type": "string",
"order": 3
},
"tunnel_user_password": {
"title": "Password",
"description": "OS-level password for logging into the jump server host",
"type": "string",
"airbyte_secret": true,
"order": 4
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
Expand Down Expand Up @@ -47,19 +48,6 @@ public class MongodbDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(MongodbDestination.class);

private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=admin&ssl=%s";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&tls=true";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=admin&directConnection=false&ssl=true";
private static final String INSTANCE_TYPE = "instance_type";
private static final String INSTANCE = "instance";
private static final String CLUSTER_URL = "cluster_url";
private static final String SERVER_ADDRESSES = "server_addresses";
private static final String REPLICA_SET = "replica_set";
private static final String AUTH_TYPE = "auth_type";
private static final String AUTHORIZATION = "authorization";
private static final String LOGIN_AND_PASSWORD = "login/password";
private static final String AIRBYTE_DATA_HASH = "_airbyte_data_hash";

private final MongodbNameTransformer namingResolver;

public static Destination sshWrappedDestination() {
Expand Down Expand Up @@ -132,7 +120,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final Set<String> documentsHash = new HashSet<>();
try (final MongoCursor<Document> cursor = collection.find().projection(excludeId()).iterator()) {
while (cursor.hasNext()) {
documentsHash.add(cursor.next().get(AIRBYTE_DATA_HASH, String.class));
documentsHash.add(cursor.next().get(MongoUtils.AIRBYTE_DATA_HASH, String.class));
}
}

Expand All @@ -150,48 +138,52 @@ private MongoDatabase getDatabase(final JsonNode config) {

@VisibleForTesting
String getConnectionString(final JsonNode config) {
final var credentials = config.get(AUTH_TYPE).get(AUTHORIZATION).asText().equals(LOGIN_AND_PASSWORD)
? String.format("%s:%s@", config.get(AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
config.get(AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText())
final var credentials = config.get(MongoUtils.AUTH_TYPE).get(MongoUtils.AUTHORIZATION).asText().equals(MongoUtils.LOGIN_AND_PASSWORD)
? String.format("%s:%s@", config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.USERNAME_KEY).asText(),
config.get(MongoUtils.AUTH_TYPE).get(JdbcUtils.PASSWORD_KEY).asText())
: StringUtils.EMPTY;

// backward compatibility check
// the old mongo db spec only includes host, port, database, and auth_type
// the new spec replaces host and port with the instance_type property
if (config.has(INSTANCE_TYPE)) {
if (config.has(MongoUtils.INSTANCE_TYPE)) {
return buildConnectionString(config, credentials);
} else {
return String.format(MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(),
return String.format(MongoUtils.MONGODB_SERVER_URL, credentials, config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), false);
}
}

private String buildConnectionString(final JsonNode config, final String credentials) {
final StringBuilder connectionStrBuilder = new StringBuilder();

final JsonNode instanceConfig = config.get(INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(INSTANCE).asText());
final JsonNode instanceConfig = config.get(MongoUtils.INSTANCE_TYPE);
final MongoInstanceType instance = MongoInstanceType.fromValue(instanceConfig.get(MongoUtils.INSTANCE).asText());

switch (instance) {
case STANDALONE -> {
// if there is no TLS present in spec, TLS should be enabled by default for strict encryption
final var tls = !instanceConfig.has(JdbcUtils.TLS_KEY) || instanceConfig.get(JdbcUtils.TLS_KEY).asBoolean();
connectionStrBuilder.append(
String.format(MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
String.format(MongoUtils.MONGODB_SERVER_URL, credentials, instanceConfig.get(JdbcUtils.HOST_KEY).asText(),
instanceConfig.get(JdbcUtils.PORT_KEY).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(), tls));
}
case REPLICA -> {
connectionStrBuilder.append(
String.format(MONGODB_REPLICA_URL, credentials, instanceConfig.get(SERVER_ADDRESSES).asText(),
String.format(MongoUtils.MONGODB_REPLICA_URL,
credentials,
instanceConfig.get(MongoUtils.SERVER_ADDRESSES).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText()));
if (instanceConfig.has(REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(REPLICA_SET).asText()));
if (instanceConfig.has(MongoUtils.REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(MongoUtils.REPLICA_SET).asText()));
}
}
case ATLAS -> {
connectionStrBuilder.append(
String.format(MONGODB_CLUSTER_URL, credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(JdbcUtils.DATABASE_KEY).asText()));
String.format(MongoUtils.MONGODB_CLUSTER_URL, credentials,
instanceConfig.get(MongoUtils.CLUSTER_URL).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText()));
}
default -> throw new IllegalArgumentException("Unsupported instance type: " + instance);
}
Expand Down
Loading