Skip to content

Redshift Destination Apply buffering strategy #12601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static DestinationType getTypeFromConfig(final JsonNode config) {
public static Map<DestinationType, Destination> getTypeToDestination() {
return Map.of(
DestinationType.INSERT_WITH_SUPER_TMP_TYPE, new RedshiftInsertDestination(RedshiftDataTmpTableMode.SUPER),
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftCopyS3Destination(RedshiftDataTmpTableMode.SUPER));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.

deleted RedshiftCopyS3Destination

DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftStagingS3Destination(RedshiftDataTmpTableMode.SUPER));
}

public static DestinationType determineUploadMode(final JsonNode config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

public class RedshiftInsertDestination extends AbstractJdbcDestination {

private static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
public static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
private static final String SCHEMA = "schema";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.destination.redshift.manifest.Entry;
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig;
import io.airbyte.integrations.destination.staging.StagingOperations;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.joda.time.DateTime;

public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations {

private final NamingConventionTransformer nameTransformer;
private final S3StorageOperations s3StorageOperations;
private final S3DestinationConfig s3Config;
private final ObjectMapper objectMapper;

public RedshiftS3StagingSqlOperations(NamingConventionTransformer nameTransformer,
AmazonS3 s3Client,
S3DestinationConfig s3Config,
RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
super(redshiftDataTmpTableMode);
this.nameTransformer = nameTransformer;
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config);
this.s3Config = s3Config;
this.objectMapper = new ObjectMapper();
}

@Override
public String getStageName(String namespace, String streamName) {
return nameTransformer.applyDefaultCase(String.join("_",
nameTransformer.convertStreamName(namespace),
nameTransformer.convertStreamName(streamName)));
}

@Override
public String getStagingPath(UUID connectionId, String namespace, String streamName, DateTime writeDatetime) {
return nameTransformer.applyDefaultCase(String.format("%s/%s_%02d_%02d_%02d_%s/",
getStageName(namespace, streamName),
writeDatetime.year().get(),
writeDatetime.monthOfYear().get(),
writeDatetime.dayOfMonth().get(),
writeDatetime.hourOfDay().get(),
connectionId));
}

@Override
public void createStageIfNotExists(JdbcDatabase database, String stageName) throws Exception {
AirbyteSentry.executeWithTracing("CreateStageIfNotExists",
() -> s3StorageOperations.createBucketObjectIfNotExists(stageName),
Map.of("stage", stageName));
}

@Override
public String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stageName, String stagingPath)
throws Exception {
return s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath);
}

private String putManifest(final String manifestContents, String stagingPath) {
String manifestFilePath = stagingPath + String.format("%s.manifest", UUID.randomUUID());
AirbyteSentry.executeWithTracing("CreateStageIfNotExists",
() -> s3StorageOperations.uploadManifest(s3Config.getBucketName(), manifestFilePath, manifestContents),
Map.of("stagingPath", stagingPath, "manifestPath", manifestFilePath));
return manifestFilePath;
}

@Override
public void copyIntoTmpTableFromStage(JdbcDatabase database,
String stageName,
String stagingPath,
List<String> stagedFiles,
String dstTableName,
String schemaName)
throws Exception {
LOGGER.info("Starting copy to tmp table from stage: {} in destination from stage: {}, schema: {}, .", dstTableName, stagingPath, schemaName);
final var possibleManifest = Optional.ofNullable(createManifest(stagedFiles, stagingPath));
AirbyteSentry.executeWithTracing("CopyIntoTableFromStage",
() -> Exceptions.toRuntime(() -> possibleManifest.stream()
.map(manifestContent -> putManifest(manifestContent, stagingPath))
.forEach(manifestPath -> executeCopy(manifestPath, database, schemaName, dstTableName))),
Map.of("schema", schemaName, "path", stagingPath, "table", dstTableName));
LOGGER.info("Copy to tmp table {}.{} in destination complete.", schemaName, dstTableName);
}

private void executeCopy(final String manifestPath, JdbcDatabase db, String schemaName, String tmpTableName) {
final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig();
final var copyQuery = String.format(
"""
COPY %s.%s FROM '%s'
CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'
CSV GZIP
REGION '%s' TIMEFORMAT 'auto'
STATUPDATE OFF
MANIFEST;""",
schemaName,
tmpTableName,
getFullS3Path(s3Config.getBucketName(), manifestPath),
credentialConfig.getAccessKeyId(),
credentialConfig.getSecretAccessKey(),
s3Config.getBucketRegion());

Exceptions.toRuntime(() -> db.execute(copyQuery));
}

private String createManifest(List<String> stagedFiles, String stagingPath) {
if (stagedFiles.isEmpty()) {
return null;
}

final var s3FileEntries = stagedFiles.stream()
.map(file -> new Entry(getManifestPath(s3Config.getBucketName(), file, stagingPath)))
.collect(Collectors.toList());
final var manifest = new Manifest(s3FileEntries);

return Exceptions.toRuntime(() -> objectMapper.writeValueAsString(manifest));
}

private static String getFullS3Path(final String s3BucketName, final String s3StagingFile) {
return String.join("/", "s3:/", s3BucketName, s3StagingFile);
}

private static String getManifestPath(final String s3BucketName, final String s3StagingFile, final String stagingPath) {
return "s3://" + s3BucketName + "/" + stagingPath + s3StagingFile;
}

@Override
public void cleanUpStage(JdbcDatabase database, String stageName, List<String> stagedFiles) throws Exception {
AirbyteSentry.executeWithTracing("CleanStage",
() -> s3StorageOperations.cleanUpBucketObject(stageName, stagedFiles),
Map.of("stage", stageName));
}

@Override
public void dropStageIfExists(JdbcDatabase database, String stageName) throws Exception {
AirbyteSentry.executeWithTracing("DropStageIfExists",
() -> s3StorageOperations.dropBucketObject(stageName),
Map.of("stage", stageName));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift;

import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcDatabase;
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftStagingS3Destination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class);
private final RedshiftDataTmpTableMode redshiftDataTmpTableMode;

public RedshiftStagingS3Destination(RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations(redshiftDataTmpTableMode));
this.redshiftDataTmpTableMode = redshiftDataTmpTableMode;
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");

final NamingConventionTransformer nameTransformer = getNamingResolver();
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode);
try (final JdbcDatabase database = getDatabase(config)) {
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
AirbyteSentry.executeWithTracing("CreateAndDropTable",
() -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations));
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
}

}

@Override
protected JdbcDatabase getDatabase(final JsonNode config) {
return getJdbcDatabase(config);
}

@Override
protected NamingConventionTransformer getNamingResolver() {
return new RedshiftSQLNameTransformer();
}

@Override
protected Map<String, String> getDefaultConnectionProperties(JsonNode config) {
return SSL_JDBC_PARAMETERS;
}

// this is a no op since we override getDatabase.
@Override
public JsonNode toJdbcConfig(JsonNode config) {
return Jsons.emptyObject();
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(config),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
config,
catalog,
isPurgeStagingData(config));
}

private static boolean isPurgeStagingData(final JsonNode config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this as static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed static

return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ public boolean isValidData(final JsonNode jsonNode) {
protected Map<String, String> getMetadataMapping() {
return ImmutableMap.of(
AesCbcEnvelopeEncryptionBlobDecorator.ENCRYPTED_CONTENT_ENCRYPTING_KEY, "x-amz-key",
AesCbcEnvelopeEncryptionBlobDecorator.INITIALIZATION_VECTOR, "x-amz-iv"
);
AesCbcEnvelopeEncryptionBlobDecorator.INITIALIZATION_VECTOR, "x-amz-iv");
}

public void uploadManifest(String bucketName, String manifestFilePath, String manifestContents) {
s3Client.putObject(s3Config.getBucketName(), manifestFilePath, manifestContents);
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----- | :------ |
| 0.3.33 | 2022-05-04 | [12601](https://github.com/airbytehq/airbyte/pull/12601) | Apply buffering strategy for S3 staging |
| 0.3.32 | 2022-04-20 | [12085](https://github.com/airbytehq/airbyte/pull/12085) | Fixed bug with switching between INSERT and COPY config |
| 0.3.31 | 2022-04-19 | [\#12064](https://github.com/airbytehq/airbyte/pull/12064) | Added option to support SUPER datatype in _airbyte_raw_** table |
| 0.3.29 | 2022-04-05 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Fixed bug with dashes in schema name | |
Expand Down