Skip to content

🎉 Destination Redshift: Add "Loading Method" option to Redshift Destination spec and UI #13415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 10, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.37
LABEL io.airbyte.version=0.3.38
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.destination.redshift;

import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD;
import static io.airbyte.integrations.destination.redshift.validator.RedshiftValidatorUtil.validateIfAllRequiredS3fieldsArePresent;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
Expand All @@ -13,52 +16,44 @@
import org.slf4j.LoggerFactory;

/**
* The Redshift Destination offers two replication strategies. The first inserts via a typical SQL
* Insert statement. Although less efficient, this requires less user set up. See
* {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to
* an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for
* production workloads, but does require users to set up an S3 bucket and pass in additional
* credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the
* given arguments to determine which strategy to use.
* The Redshift Destination offers two replication strategies. The first inserts via a typical SQL Insert statement. Although less efficient, this requires less user set up. See {@link
* RedshiftInsertDestination} for more detail. The second inserts via streaming the data to an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for production
* workloads, but does require users to set up an S3 bucket and pass in additional credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given arguments to
* determine which strategy to use.
*/
public class RedshiftDestination extends SwitchingDestination<RedshiftDestination.DestinationType> {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class);
private static final String METHOD = "method";

private static final Map<DestinationType, Destination> destinationMap = Map.of(
DestinationType.STANDARD, new RedshiftInsertDestination(),
DestinationType.COPY_S3, new RedshiftStagingS3Destination()
);

enum DestinationType {
STANDARD,
COPY_S3
}

public RedshiftDestination() {
super(DestinationType.class, RedshiftDestination::getTypeFromConfig, getTypeToDestination());
super(DestinationType.class, RedshiftDestination::getTypeFromConfig, destinationMap);
}

public static DestinationType getTypeFromConfig(final JsonNode config) {
private static DestinationType getTypeFromConfig(final JsonNode config) {
return determineUploadMode(config);
}

public static Map<DestinationType, Destination> getTypeToDestination() {
return Map.of(
DestinationType.STANDARD, new RedshiftInsertDestination(),
DestinationType.COPY_S3, new RedshiftStagingS3Destination());
}

public static DestinationType determineUploadMode(final JsonNode config) {
final var bucketNode = config.get("s3_bucket_name");
final var regionNode = config.get("s3_bucket_region");
final var accessKeyIdNode = config.get("access_key_id");
final var secretAccessKeyNode = config.get("secret_access_key");

if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode)
&& isNullOrEmpty(secretAccessKeyNode)) {
final JsonNode uploadingMethod = config.get(UPLOADING_METHOD);
if (uploadingMethod.has(METHOD) && "standard".equalsIgnoreCase(uploadingMethod.get(METHOD).asText())) {
LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " +
"Please use the Amazon S3 upload mode if you are syncing a large amount of data.");
return DestinationType.STANDARD;
}

if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode)
&& isNullOrEmpty(secretAccessKeyNode)) {
if (validateIfAllRequiredS3fieldsArePresent(uploadingMethod)) {
throw new RuntimeException("Error: Partially missing S3 Configuration.");
}
return DestinationType.COPY_S3;
Expand All @@ -70,9 +65,4 @@ public static void main(final String[] args) throws Exception {
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", RedshiftDestination.class);
}

private static boolean isNullOrEmpty(JsonNode jsonNode) {
return jsonNode == null || jsonNode.asText().equals("");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.USERNAME;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcConfig;
import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD;
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -48,7 +49,7 @@ public RedshiftStagingS3Destination() {

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(config.get(UPLOADING_METHOD));
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");

final NamingConventionTransformer nameTransformer = getNamingResolver();
Expand Down Expand Up @@ -104,9 +105,9 @@ public JsonNode toJdbcConfig(final JsonNode config) {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config.get(UPLOADING_METHOD));
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.airbyte.integrations.destination.redshift.constants;

/**
* Constant holder for Redshift Destination
*/
public class RedshiftDestinationConstants {

private RedshiftDestinationConstants() {
}

public static final String UPLOADING_METHOD = "uploading_method";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.airbyte.integrations.destination.redshift.validator;

import com.fasterxml.jackson.databind.JsonNode;

/**
* Validator for Destination Redshift Schema
*/
public class RedshiftValidatorUtil {

private RedshiftValidatorUtil() {
}

public static boolean validateIfAllRequiredS3fieldsArePresent(final JsonNode uploadMode) {
return isNullOrEmpty(uploadMode.get("s3_bucket_name"))
&& isNullOrEmpty(uploadMode.get("s3_bucket_region"))
&& isNullOrEmpty(uploadMode.get("access_key_id"))
&& isNullOrEmpty(uploadMode.get("secret_access_key"));
}

private static boolean isNullOrEmpty(final JsonNode jsonNode) {
return null == jsonNode || "".equals(jsonNode.asText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,76 +48,106 @@
"default": "public",
"title": "Default Schema"
},
"s3_bucket_name": {
"title": "S3 Bucket Name (Optional)",
"type": "string",
"description": "The name of the staging S3 bucket to use if utilising a COPY strategy. COPY is recommended for production workloads for better speed and scalability. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html\">AWS docs</a> for more details.",
"examples": ["airbyte.staging"]
},
"s3_bucket_path": {
"title": "S3 Bucket Path (Optional)",
"type": "string",
"description": "The directory under the S3 bucket where data will be written. If not provided, then defaults to the root directory. See <a href=\"https://docs.aws.amazon.com/prescriptive-guidance/latest/defining-bucket-names-data-lakes/faq.html#:~:text=be%20globally%20unique.-,For%20S3%20bucket%20paths,-%2C%20you%20can%20use\">path's name recommendations</a> for more details.",
"examples": ["data_sync/test"]
},
"s3_bucket_region": {
"title": "S3 Bucket Region (Optional)",
"type": "string",
"default": "",
"description": "The region of the S3 staging bucket to use if utilising a COPY strategy. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html#:~:text=In-,Region,-%2C%20choose%20the%20AWS\">AWS docs</a> for details.",
"enum": [
"",
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-southeast-1",
"ap-southeast-2",
"ca-central-1",
"cn-north-1",
"cn-northwest-1",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"me-south-1"
"uploading_method": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned with broken backward compatibility. @alexandertsukanov is there a way to pick S3 staging by default and probably read existing config instead of making users update connector settings?
cc @grishick

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexandr-shegeda this was fixed and covered with the unit tests. Great comment. Thanks.

__
sema-logo  Summary: 👌 This code looks good

"title": "Uploading Method",
"type": "object",
"description": "The method how the data will be uploaded to the database.",
"oneOf": [
{
"title": "Standard",
"additionalProperties": false,
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "Standard"
}
}
},
{
"title": "S3 Staging",
"additionalProperties": false,
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "S3 Staging"
},
"s3_bucket_name": {
"title": "S3 Bucket Name (Optional)",
"type": "string",
"description": "The name of the staging S3 bucket to use if utilising a COPY strategy. COPY is recommended for production workloads for better speed and scalability. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html\">AWS docs</a> for more details.",
"examples": ["airbyte.staging"]
},
"s3_bucket_path": {
"title": "S3 Bucket Path (Optional)",
"type": "string",
"description": "The directory under the S3 bucket where data will be written. If not provided, then defaults to the root directory. See <a href=\"https://docs.aws.amazon.com/prescriptive-guidance/latest/defining-bucket-names-data-lakes/faq.html#:~:text=be%20globally%20unique.-,For%20S3%20bucket%20paths,-%2C%20you%20can%20use\">path's name recommendations</a> for more details.",
"examples": ["data_sync/test"]
},
"s3_bucket_region": {
"title": "S3 Bucket Region (Optional)",
"type": "string",
"default": "",
"description": "The region of the S3 staging bucket to use if utilising a COPY strategy. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html#:~:text=In-,Region,-%2C%20choose%20the%20AWS\">AWS docs</a> for details.",
"enum": [
"",
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-southeast-1",
"ap-southeast-2",
"ca-central-1",
"cn-north-1",
"cn-northwest-1",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"me-south-1"
]
},
"access_key_id": {
"type": "string",
"description": "This ID grants access to the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket. See <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">AWS docs</a> on how to generate an access key ID and secret access key.",
"title": "S3 Key Id (Optional)",
"airbyte_secret": true
},
"secret_access_key": {
"type": "string",
"description": "The corresponding secret to the above access key id. See <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">AWS docs</a> on how to generate an access key ID and secret access key.",
"title": "S3 Access Key (Optional)",
"airbyte_secret": true
},
"part_size": {
"type": "integer",
"minimum": 10,
"maximum": 100,
"examples": ["10"],
"description": "Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note: a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=above%20key%20id.-,Part%20Size,-Affects%20the%20size\",> docs</a> for details.",
"title": "Stream Part Size (Optional)"
},
"purge_staging_data": {
"title": "Purge Staging Files and Tables (Optional)",
"type": "boolean",
"description": "Whether to delete the staging files from S3 after completing the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"> docs</a> for details.",
"default": true
}
}
}
]
},
"access_key_id": {
"type": "string",
"description": "This ID grants access to the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket. See <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">AWS docs</a> on how to generate an access key ID and secret access key.",
"title": "S3 Key Id (Optional)",
"airbyte_secret": true
},
"secret_access_key": {
"type": "string",
"description": "The corresponding secret to the above access key id. See <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">AWS docs</a> on how to generate an access key ID and secret access key.",
"title": "S3 Access Key (Optional)",
"airbyte_secret": true
},
"part_size": {
"type": "integer",
"minimum": 10,
"maximum": 100,
"examples": ["10"],
"description": "Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note: a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=above%20key%20id.-,Part%20Size,-Affects%20the%20size\",> docs</a> for details.",
"title": "Stream Part Size (Optional)"
},
"purge_staging_data": {
"title": "Purge Staging Files and Tables (Optional)",
"type": "boolean",
"description": "Whether to delete the staging files from S3 after completing the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"> docs</a> for details.",
"default": true
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.JsonSchemaType;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
Expand Down Expand Up @@ -63,17 +65,8 @@ class RedshiftInsertDestinationAcceptanceTest extends RedshiftStagingS3Destinati
private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

public JsonNode getStaticConfig() {
return removeStagingConfigurationFromRedshift(Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))));
}

public static JsonNode removeStagingConfigurationFromRedshift(final JsonNode config) {
final var original = (ObjectNode) Jsons.clone(config);
original.remove("s3_bucket_name");
original.remove("s3_bucket_region");
original.remove("access_key_id");
original.remove("secret_access_key");
return original;
public JsonNode getStaticConfig() throws IOException {
return Jsons.deserialize(Files.readString(Path.of("secrets/config.json")));
}

void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftSt
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

public JsonNode getStaticConfig() {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json")));
}

void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
Expand Down Expand Up @@ -52,8 +53,8 @@ protected JsonNode getConfig() {
return config;
}

public JsonNode getStaticConfig() {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
public JsonNode getStaticConfig() throws IOException {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json")));
}

@Override
Expand Down
Loading