-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Redshift Destination Apply buffering strategy #12601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
VitaliiMaltsev
merged 16 commits into
master
from
vmaltsev/11426-destination-redshift-apply-buffering
May 12, 2022
Merged
Changes from 6 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
2d805f0
Redshift Destination: Apply buffering strategy
VitaliiMaltsev dcdf673
add manifest uploading
VitaliiMaltsev d058c9c
refactoring
VitaliiMaltsev 931d891
Merge branch 'master' into vmaltsev/11426-destination-redshift-apply-…
VitaliiMaltsev 756d43b
fixed checkstyle
VitaliiMaltsev 3fc748c
updated CHANGELOG
VitaliiMaltsev 722e173
Merge branch 'master' into vmaltsev/11426-destination-redshift-apply-…
VitaliiMaltsev 56b3910
removed redundant static
VitaliiMaltsev d9af45e
airbyte-12265: Added stagingOperations.onDestinationCloseOperations(…
alexandertsukanov 5e3f279
Merge remote-tracking branch 'origin/vmaltsev/11426-destination-redsh…
alexandertsukanov 877917f
airbyte-12265: Created operations and copiers java packages.
alexandertsukanov 162d983
Merge branch 'master' into vmaltsev/11426-destination-redshift-apply-…
VitaliiMaltsev c0f1aba
safe delete of RedshiftCopyS3Destination.java
VitaliiMaltsev 57eab3a
rename tests
VitaliiMaltsev e657f9d
bump version
VitaliiMaltsev 1f661a2
auto-bump connector version
octavia-squidington-iii File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
158 changes: 158 additions & 0 deletions
158
...ain/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingSqlOperations.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.redshift; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import io.airbyte.commons.lang.Exceptions; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.base.sentry.AirbyteSentry; | ||
import io.airbyte.integrations.destination.NamingConventionTransformer; | ||
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; | ||
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode; | ||
import io.airbyte.integrations.destination.redshift.manifest.Entry; | ||
import io.airbyte.integrations.destination.redshift.manifest.Manifest; | ||
import io.airbyte.integrations.destination.s3.S3DestinationConfig; | ||
import io.airbyte.integrations.destination.s3.S3StorageOperations; | ||
import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; | ||
import io.airbyte.integrations.destination.staging.StagingOperations; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
import org.joda.time.DateTime; | ||
|
||
public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations { | ||
|
||
private final NamingConventionTransformer nameTransformer; | ||
private final S3StorageOperations s3StorageOperations; | ||
private final S3DestinationConfig s3Config; | ||
private final ObjectMapper objectMapper; | ||
|
||
public RedshiftS3StagingSqlOperations(NamingConventionTransformer nameTransformer, | ||
AmazonS3 s3Client, | ||
S3DestinationConfig s3Config, | ||
RedshiftDataTmpTableMode redshiftDataTmpTableMode) { | ||
super(redshiftDataTmpTableMode); | ||
this.nameTransformer = nameTransformer; | ||
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config); | ||
this.s3Config = s3Config; | ||
this.objectMapper = new ObjectMapper(); | ||
} | ||
|
||
@Override | ||
public String getStageName(String namespace, String streamName) { | ||
return nameTransformer.applyDefaultCase(String.join("_", | ||
nameTransformer.convertStreamName(namespace), | ||
nameTransformer.convertStreamName(streamName))); | ||
} | ||
|
||
@Override | ||
public String getStagingPath(UUID connectionId, String namespace, String streamName, DateTime writeDatetime) { | ||
return nameTransformer.applyDefaultCase(String.format("%s/%s_%02d_%02d_%02d_%s/", | ||
getStageName(namespace, streamName), | ||
writeDatetime.year().get(), | ||
writeDatetime.monthOfYear().get(), | ||
writeDatetime.dayOfMonth().get(), | ||
writeDatetime.hourOfDay().get(), | ||
connectionId)); | ||
} | ||
|
||
@Override | ||
public void createStageIfNotExists(JdbcDatabase database, String stageName) throws Exception { | ||
AirbyteSentry.executeWithTracing("CreateStageIfNotExists", | ||
() -> s3StorageOperations.createBucketObjectIfNotExists(stageName), | ||
Map.of("stage", stageName)); | ||
} | ||
|
||
@Override | ||
public String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stageName, String stagingPath) | ||
throws Exception { | ||
return s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath); | ||
} | ||
|
||
private String putManifest(final String manifestContents, String stagingPath) { | ||
String manifestFilePath = stagingPath + String.format("%s.manifest", UUID.randomUUID()); | ||
AirbyteSentry.executeWithTracing("CreateStageIfNotExists", | ||
() -> s3StorageOperations.uploadManifest(s3Config.getBucketName(), manifestFilePath, manifestContents), | ||
Map.of("stagingPath", stagingPath, "manifestPath", manifestFilePath)); | ||
return manifestFilePath; | ||
} | ||
|
||
@Override | ||
public void copyIntoTmpTableFromStage(JdbcDatabase database, | ||
String stageName, | ||
String stagingPath, | ||
List<String> stagedFiles, | ||
String dstTableName, | ||
String schemaName) | ||
throws Exception { | ||
LOGGER.info("Starting copy to tmp table from stage: {} in destination from stage: {}, schema: {}, .", dstTableName, stagingPath, schemaName); | ||
final var possibleManifest = Optional.ofNullable(createManifest(stagedFiles, stagingPath)); | ||
AirbyteSentry.executeWithTracing("CopyIntoTableFromStage", | ||
() -> Exceptions.toRuntime(() -> possibleManifest.stream() | ||
.map(manifestContent -> putManifest(manifestContent, stagingPath)) | ||
.forEach(manifestPath -> executeCopy(manifestPath, database, schemaName, dstTableName))), | ||
Map.of("schema", schemaName, "path", stagingPath, "table", dstTableName)); | ||
LOGGER.info("Copy to tmp table {}.{} in destination complete.", schemaName, dstTableName); | ||
} | ||
|
||
private void executeCopy(final String manifestPath, JdbcDatabase db, String schemaName, String tmpTableName) { | ||
final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig(); | ||
final var copyQuery = String.format( | ||
""" | ||
COPY %s.%s FROM '%s' | ||
CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s' | ||
CSV GZIP | ||
REGION '%s' TIMEFORMAT 'auto' | ||
STATUPDATE OFF | ||
MANIFEST;""", | ||
schemaName, | ||
tmpTableName, | ||
getFullS3Path(s3Config.getBucketName(), manifestPath), | ||
credentialConfig.getAccessKeyId(), | ||
credentialConfig.getSecretAccessKey(), | ||
s3Config.getBucketRegion()); | ||
|
||
Exceptions.toRuntime(() -> db.execute(copyQuery)); | ||
} | ||
|
||
private String createManifest(List<String> stagedFiles, String stagingPath) { | ||
if (stagedFiles.isEmpty()) { | ||
return null; | ||
} | ||
|
||
final var s3FileEntries = stagedFiles.stream() | ||
.map(file -> new Entry(getManifestPath(s3Config.getBucketName(), file, stagingPath))) | ||
.collect(Collectors.toList()); | ||
final var manifest = new Manifest(s3FileEntries); | ||
|
||
return Exceptions.toRuntime(() -> objectMapper.writeValueAsString(manifest)); | ||
} | ||
|
||
private static String getFullS3Path(final String s3BucketName, final String s3StagingFile) { | ||
return String.join("/", "s3:/", s3BucketName, s3StagingFile); | ||
} | ||
|
||
private static String getManifestPath(final String s3BucketName, final String s3StagingFile, final String stagingPath) { | ||
return "s3://" + s3BucketName + "/" + stagingPath + s3StagingFile; | ||
} | ||
|
||
@Override | ||
public void cleanUpStage(JdbcDatabase database, String stageName, List<String> stagedFiles) throws Exception { | ||
AirbyteSentry.executeWithTracing("CleanStage", | ||
() -> s3StorageOperations.cleanUpBucketObject(stageName, stagedFiles), | ||
Map.of("stage", stageName)); | ||
} | ||
|
||
@Override | ||
public void dropStageIfExists(JdbcDatabase database, String stageName) throws Exception { | ||
AirbyteSentry.executeWithTracing("DropStageIfExists", | ||
() -> s3StorageOperations.dropBucketObject(stageName), | ||
Map.of("stage", stageName)); | ||
} | ||
|
||
} |
107 changes: 107 additions & 0 deletions
107
.../main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.redshift; | ||
|
||
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS; | ||
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcDatabase; | ||
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.Destination; | ||
import io.airbyte.integrations.base.sentry.AirbyteSentry; | ||
import io.airbyte.integrations.destination.NamingConventionTransformer; | ||
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; | ||
import io.airbyte.integrations.destination.record_buffer.FileBuffer; | ||
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode; | ||
import io.airbyte.integrations.destination.s3.S3Destination; | ||
import io.airbyte.integrations.destination.s3.S3DestinationConfig; | ||
import io.airbyte.integrations.destination.s3.S3StorageOperations; | ||
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer; | ||
import io.airbyte.integrations.destination.staging.StagingConsumerFactory; | ||
import io.airbyte.protocol.models.AirbyteConnectionStatus; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import java.util.Map; | ||
import java.util.function.Consumer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class RedshiftStagingS3Destination extends AbstractJdbcDestination implements Destination { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class); | ||
private final RedshiftDataTmpTableMode redshiftDataTmpTableMode; | ||
|
||
public RedshiftStagingS3Destination(RedshiftDataTmpTableMode redshiftDataTmpTableMode) { | ||
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations(redshiftDataTmpTableMode)); | ||
this.redshiftDataTmpTableMode = redshiftDataTmpTableMode; | ||
} | ||
|
||
@Override | ||
public AirbyteConnectionStatus check(final JsonNode config) { | ||
final S3DestinationConfig s3Config = getS3DestinationConfig(config); | ||
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, ""); | ||
|
||
final NamingConventionTransformer nameTransformer = getNamingResolver(); | ||
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations = | ||
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode); | ||
try (final JdbcDatabase database = getDatabase(config)) { | ||
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText()); | ||
AirbyteSentry.executeWithTracing("CreateAndDropTable", | ||
() -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations)); | ||
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); | ||
} catch (final Exception e) { | ||
LOGGER.error("Exception while checking connection: ", e); | ||
return new AirbyteConnectionStatus() | ||
.withStatus(AirbyteConnectionStatus.Status.FAILED) | ||
.withMessage("Could not connect with provided configuration. \n" + e.getMessage()); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
protected JdbcDatabase getDatabase(final JsonNode config) { | ||
return getJdbcDatabase(config); | ||
} | ||
|
||
@Override | ||
protected NamingConventionTransformer getNamingResolver() { | ||
return new RedshiftSQLNameTransformer(); | ||
} | ||
|
||
@Override | ||
protected Map<String, String> getDefaultConnectionProperties(JsonNode config) { | ||
return SSL_JDBC_PARAMETERS; | ||
} | ||
|
||
// this is a no op since we override getDatabase. | ||
@Override | ||
public JsonNode toJdbcConfig(JsonNode config) { | ||
return Jsons.emptyObject(); | ||
} | ||
|
||
@Override | ||
public AirbyteMessageConsumer getConsumer(final JsonNode config, | ||
final ConfiguredAirbyteCatalog catalog, | ||
final Consumer<AirbyteMessage> outputRecordCollector) { | ||
final S3DestinationConfig s3Config = getS3DestinationConfig(config); | ||
return new StagingConsumerFactory().create( | ||
outputRecordCollector, | ||
getDatabase(config), | ||
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode), | ||
getNamingResolver(), | ||
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)), | ||
config, | ||
catalog, | ||
isPurgeStagingData(config)); | ||
} | ||
|
||
private static boolean isPurgeStagingData(final JsonNode config) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed static |
||
return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean(); | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted RedshiftCopyS3Destination