Skip to content

🐛Destination-dynamodb: enforce ssl connection #18672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-dynamodb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-dynamodb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

integrationTestJavaImplementation project(':airbyte-commons-worker')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-dynamodb')
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.dynamodb;

import com.amazonaws.Protocol;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
Expand All @@ -12,12 +13,15 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamodbDestination extends BaseConnector implements Destination {

private static final String NON_SECURE_URL_ERR_MSG = "Server Endpoint requires HTTPS";
private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestination.class);

public static void main(final String[] args) throws Exception {
Expand All @@ -27,7 +31,17 @@ public static void main(final String[] args) throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
DynamodbChecker.attemptDynamodbWriteAndDelete(DynamodbDestinationConfig.getDynamodbDestinationConfig(config));
final DynamodbDestinationConfig dynamodbDestinationConfig =
DynamodbDestinationConfig.getDynamodbDestinationConfig(config);

// enforce ssl connection
if (isNotSsl(dynamodbDestinationConfig.getEndpoint())) {
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(NON_SECURE_URL_ERR_MSG);
}

DynamodbChecker.attemptDynamodbWriteAndDelete(dynamodbDestinationConfig);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception attempting to access the DynamoDB table: ", e);
Expand All @@ -38,6 +52,11 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

private boolean isNotSsl(String endpoint) throws MalformedURLException {
return !endpoint.isBlank() &&
new URL(endpoint).getProtocol().equals(Protocol.HTTP.toString());
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.dynamodb;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
Expand All @@ -14,20 +16,25 @@
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.util.*;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamodbDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbDestinationAcceptanceTest.class);
private static final String NON_SECURE_URL_ERR_MSG = "Server Endpoint requires HTTPS";

protected final String secretFilePath = "secrets/config.json";
protected JsonNode configJson;
Expand Down Expand Up @@ -159,4 +166,21 @@ protected void tearDown(final TestDestinationEnv testEnv) {
}
}

@Test
public void testCheckConnectionInvalidHttpProtocol() throws Exception {
final StandardCheckConnectionOutput checkResult = runCheck(getUnsecureConfig());
assertEquals(Status.FAILED, checkResult.getStatus());
assertEquals(NON_SECURE_URL_ERR_MSG, checkResult.getMessage());
}

protected JsonNode getUnsecureConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("dynamodb_endpoint", "http://testurl.com:9000")
.put("dynamodb_table_name_prefix", "integration-test")
.put("dynamodb_region", "us-east-2")
.put("access_key_id", "dummy_access_key_id")
.put("secret_access_key", "dummy_secret_access_key")
.build());
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ This connector by default uses 10 capacity units for both Read and Write in Dyna

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.6 | 2022-11-01 | [\#18672](https://github.com/airbytehq/airbyte/pull/18672) | Enforce to use ssl connection |
| 0.1.5 | 2022-08-05 | [\#15350](https://github.com/airbytehq/airbyte/pull/15350) | Added per-stream handling |
| 0.1.4 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors |
| 0.1.3 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
Expand Down