-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Redshift Destination Apply buffering strategy #12601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redshift Destination Apply buffering strategy #12601
Conversation
…buffering # Conflicts: # airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java
/test connector=connectors/destination-redshift
|
isPurgeStagingData(config)); | ||
} | ||
|
||
private static boolean isPurgeStagingData(final JsonNode config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this as static
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed static
… to StagingConsumerFactory.java.
…ift-apply-buffering' into vmaltsev/11426-destination-redshift-apply-buffering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more comments. LGTM.
@@ -213,7 +212,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database, | |||
} | |||
queryList.add(stagingOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName)); | |||
} | |||
|
|||
stagingOperations.onDestinationCloseOperations(database, writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary but maybe shift this part writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet())
in a separate method with name which describe what we do here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description of this interface is located in the interface by itsself, please see
@suhomud
@@ -42,7 +42,7 @@ public static DestinationType getTypeFromConfig(final JsonNode config) { | |||
public static Map<DestinationType, Destination> getTypeToDestination() { | |||
return Map.of( | |||
DestinationType.INSERT_WITH_SUPER_TMP_TYPE, new RedshiftInsertDestination(RedshiftDataTmpTableMode.SUPER), | |||
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftCopyS3Destination(RedshiftDataTmpTableMode.SUPER)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.
deleted RedshiftCopyS3Destination
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another minor comment: description of RedshiftDestination class needs to be updated. Currently, it refers to RedshiftCopyS3Destination, which is essentially getting deprecated with this change.
@VitaliiMaltsev , could you pease sign the CLA (see comment from CLA above) |
…buffering # Conflicts: # airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java
/test connector=connectors/destination-redshift
|
updated |
done |
LGTM, feel free to publish the new version (🙏 that publish command works from the first attempt) |
/publish connector=connectors/destination-redshift
|
* Redshift Destination: Apply buffering strategy * add manifest uploading * refactoring * fixed checkstyle * updated CHANGELOG * removed redundant static * airbyte-12265: Added stagingOperations.onDestinationCloseOperations() to StagingConsumerFactory.java. * airbyte-12265: Created operations and copiers java packages. * safe delete of RedshiftCopyS3Destination.java * rename tests * bump version * auto-bump connector version Co-authored-by: Oleksandr Tsukanov <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
What
Follow buffering strategy (compressed serialized records) for Destination Redshift
How
This PR switches the Redshift destination and stop using the RedshiftCopyS3Destination, but use a new RedshiftS3StagingDestination
So this PR simply requires its own set of StagingOperations using S3 buckets (external staging method)
Recommended reading order
RedshiftStagingS3Destination.java
RedshiftS3StagingSqlOperations.java
🚨 User Impact 🚨
Potentially a bit slower sync but more consistent memory and thread consumption thanks to on-disk buffering before staging. (More resilient to OOM and network failures)
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.