Skip to content

Redshift Destination Apply buffering strategy #12601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.32
dockerImageTag: 0.3.33
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
54 changes: 31 additions & 23 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3461,7 +3461,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.32"
- dockerImage: "airbyte/destination-redshift:0.3.33"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3514,27 +3514,30 @@
default: "public"
title: "Default Schema"
s3_bucket_name:
title: "S3 Bucket Name"
title: "S3 Bucket Name (Optional)"
type: "string"
description: "The name of the staging S3 bucket to use if utilising a COPY\
\ strategy. COPY is recommended for production workloads for better speed\
\ and scalability. See <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/c_loading-data-best-practices.html\"\
\ and scalability. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html\"\
>AWS docs</a> for more details."
examples:
- "airbyte.staging"
s3_bucket_path:
title: "S3 Bucket Path"
title: "S3 Bucket Path (Optional)"
type: "string"
description: "The directory under the S3 bucket where data will be written.\
\ If not provided, then defaults to the root directory."
\ If not provided, then defaults to the root directory. See <a href=\"\
https://docs.aws.amazon.com/prescriptive-guidance/latest/defining-bucket-names-data-lakes/faq.html#:~:text=be%20globally%20unique.-,For%20S3%20bucket%20paths,-%2C%20you%20can%20use\"\
>path's name recommendations</a> for more details."
examples:
- "data_sync/test"
s3_bucket_region:
title: "S3 Bucket Region"
title: "S3 Bucket Region (Optional)"
type: "string"
default: ""
description: "The region of the S3 staging bucket to use if utilising a\
\ copy strategy."
\ COPY strategy. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html#:~:text=In-,Region,-%2C%20choose%20the%20AWS\"\
>AWS docs</a> for details."
enum:
- ""
- "us-east-1"
Expand Down Expand Up @@ -3562,36 +3565,41 @@
- "me-south-1"
access_key_id:
type: "string"
description: "The Access Key Id granting allow one to access the above S3\
\ staging bucket. Airbyte requires Read and Write permissions to the given\
\ bucket."
title: "S3 Key Id"
description: "This ID grants access to the above S3 staging bucket. Airbyte\
\ requires Read and Write permissions to the given bucket. See <a href=\"\
https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\"\
>AWS docs</a> on how to generate an access key ID and secret access key."
title: "S3 Key Id (Optional)"
airbyte_secret: true
secret_access_key:
type: "string"
description: "The corresponding secret to the above access key id."
title: "S3 Access Key"
description: "The corresponding secret to the above access key id. See <a\
\ href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\"\
>AWS docs</a> on how to generate an access key ID and secret access key."
title: "S3 Access Key (Optional)"
airbyte_secret: true
part_size:
type: "integer"
minimum: 10
maximum: 100
examples:
- "10"
description: "Optional. Increase this if syncing tables larger than 100GB.\
\ Only relevant for COPY. Files are streamed to S3 in parts. This determines\
\ the size of each part, in MBs. As S3 has a limit of 10,000 parts per\
\ file, part size affects the table size. This is 10MB by default, resulting\
\ in a default limit of 100GB tables. Note, a larger part size will result\
description: "Increase this if syncing tables larger than 100GB. Only relevant\
\ for COPY. Files are streamed to S3 in parts. This determines the size\
\ of each part, in MBs. As S3 has a limit of 10,000 parts per file, part\
\ size affects the table size. This is 10MB by default, resulting in a\
\ default limit of 100GB tables. Note: a larger part size will result\
\ in larger memory requirements. A rule of thumb is to multiply the part\
\ size by 10 to get the memory requirement. Modify this with care."
title: "Stream Part Size"
\ size by 10 to get the memory requirement. Modify this with care. See\
\ <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=above%20key%20id.-,Part%20Size,-Affects%20the%20size\"\
,> docs</a> for details."
title: "Stream Part Size (Optional)"
purge_staging_data:
title: "Purge Staging Files and Tables"
title: "Purge Staging Files and Tables (Optional)"
type: "boolean"
description: "Whether to delete the staging files from S3 after completing\
\ the sync. See the docs for details. Only relevant for COPY. Defaults\
\ to true."
\ the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"\
> docs</a> for details."
default: true
supportsIncremental: true
supportsNormalization: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
throw new RuntimeException("Failed to upload data from stage " + stagingPath, e);
}
writeConfig.clearStagedFiles();

stagingOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case OVERWRITE -> queryList.add(stagingOperations.truncateTableQuery(database, schemaName, dstTableName));
Expand All @@ -213,7 +212,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
}
queryList.add(stagingOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName));
}

stagingOperations.onDestinationCloseOperations(database, writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary but maybe shift this part writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet()) in a separate method with name which describe what we do here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description of this interface is located in the interface by itsself, please see
@suhomud

LOGGER.info("Executing finalization of tables.");
stagingOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.32
LABEL io.airbyte.version=0.3.33
LABEL io.airbyte.name=airbyte/destination-redshift

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to
* an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for
* production workloads, but does require users to set up an S3 bucket and pass in additional
* credentials. See {@link RedshiftCopyS3Destination} for more detail. This class inspect the given
* credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given
* arguments to determine which strategy to use.
*/
public class RedshiftDestination extends SwitchingDestination<RedshiftDestination.DestinationType> {
Expand All @@ -42,7 +42,7 @@ public static DestinationType getTypeFromConfig(final JsonNode config) {
public static Map<DestinationType, Destination> getTypeToDestination() {
return Map.of(
DestinationType.INSERT_WITH_SUPER_TMP_TYPE, new RedshiftInsertDestination(RedshiftDataTmpTableMode.SUPER),
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftCopyS3Destination(RedshiftDataTmpTableMode.SUPER));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.

deleted RedshiftCopyS3Destination

DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftStagingS3Destination(RedshiftDataTmpTableMode.SUPER));
}

public static DestinationType determineUploadMode(final JsonNode config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import java.util.Map;
import java.util.Optional;

public class RedshiftInsertDestination extends AbstractJdbcDestination {

private static final String DRIVER_CLASS = DatabaseDriver.REDSHIFT.getDriverClassName();
public static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
private static final String SCHEMA = "schema";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift;

import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcDatabase;
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftStagingS3Destination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class);
private final RedshiftDataTmpTableMode redshiftDataTmpTableMode;

public RedshiftStagingS3Destination(RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations(redshiftDataTmpTableMode));
this.redshiftDataTmpTableMode = redshiftDataTmpTableMode;
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");

final NamingConventionTransformer nameTransformer = getNamingResolver();
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode);
try (final JdbcDatabase database = getDatabase(config)) {
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
AirbyteSentry.executeWithTracing("CreateAndDropTable",
() -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations));
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
}

}

@Override
protected JdbcDatabase getDatabase(final JsonNode config) {
return getJdbcDatabase(config);
}

@Override
protected NamingConventionTransformer getNamingResolver() {
return new RedshiftSQLNameTransformer();
}

@Override
protected Map<String, String> getDefaultConnectionProperties(JsonNode config) {
return SSL_JDBC_PARAMETERS;
}

// this is a no op since we override getDatabase.
@Override
public JsonNode toJdbcConfig(JsonNode config) {
return Jsons.emptyObject();
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(config),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
config,
catalog,
isPurgeStagingData(config));
}

private boolean isPurgeStagingData(final JsonNode config) {
return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift;
package io.airbyte.integrations.destination.redshift.copiers;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift;
package io.airbyte.integrations.destination.redshift.copiers;

import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.db.jdbc.JdbcDatabase;
Expand Down
Loading