Skip to content

Redshift Destination Apply buffering strategy #12601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

VitaliiMaltsev
Copy link
Contributor

@VitaliiMaltsev VitaliiMaltsev commented May 5, 2022

What

Follow buffering strategy (compressed serialized records) for Destination Redshift

How

This PR switches the Redshift destination and stop using the RedshiftCopyS3Destination, but use a new RedshiftS3StagingDestination

So this PR simply requires its own set of StagingOperations using S3 buckets (external staging method)

Recommended reading order

  1. RedshiftStagingS3Destination.java
  2. RedshiftS3StagingSqlOperations.java

🚨 User Impact 🚨

Potentially a bit slower sync but more consistent memory and thread consumption thanks to on-disk buffering before staging. (More resilient to OOM and network failures)

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

Tests

Unit

Put your unit tests output here.

Integration

Put your integration tests output here.

Acceptance

Put your acceptance tests output here.

@github-actions github-actions bot added the area/connectors Connector related issues label May 5, 2022
…buffering

# Conflicts:
#	airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java
@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label May 5, 2022
@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented May 5, 2022

/test connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/2275689331
✅ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/2275689331
Python tests coverage:

Name                                                                                                                            Stmts   Miss  Cover
---------------------------------------------------------------------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                                                                                          2      0   100%
normalization/transform_catalog/reserved_keywords.py                                                                               13      0   100%
normalization/transform_catalog/__init__.py                                                                                         2      0   100%
normalization/destination_type.py                                                                                                  13      0   100%
normalization/__init__.py                                                                                                           4      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py     144      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py               1      0   100%
/actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py                      2      0   100%
normalization/transform_catalog/destination_name_transformer.py                                                                   155      8    95%
normalization/transform_config/transform.py                                                                                       159     31    81%
normalization/transform_catalog/table_name_registry.py                                                                            174     34    80%
normalization/transform_catalog/utils.py                                                                                           34      7    79%
normalization/transform_catalog/dbt_macro.py                                                                                       22      7    68%
normalization/transform_catalog/catalog_processor.py                                                                              147     80    46%
normalization/transform_catalog/transform.py                                                                                       61     38    38%
normalization/transform_catalog/stream_processor.py                                                                               534    345    35%
---------------------------------------------------------------------------------------------------------------------------------------------------
TOTAL                                                                                                                            1467    550    63%

@CLAassistant
Copy link

CLAassistant commented May 5, 2022

CLA assistant check
All committers have signed the CLA.

isPurgeStagingData(config));
}

private static boolean isPurgeStagingData(final JsonNode config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this as static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed static

…ift-apply-buffering' into vmaltsev/11426-destination-redshift-apply-buffering
Copy link
Contributor

@alexandertsukanov alexandertsukanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more comments. LGTM.

@@ -213,7 +212,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
}
queryList.add(stagingOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName));
}

stagingOperations.onDestinationCloseOperations(database, writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary but maybe shift this part writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet()) in a separate method with name which describe what we do here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description of this interface is located in the interface by itsself, please see
@suhomud

@@ -42,7 +42,7 @@ public static DestinationType getTypeFromConfig(final JsonNode config) {
public static Map<DestinationType, Destination> getTypeToDestination() {
return Map.of(
DestinationType.INSERT_WITH_SUPER_TMP_TYPE, new RedshiftInsertDestination(RedshiftDataTmpTableMode.SUPER),
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftCopyS3Destination(RedshiftDataTmpTableMode.SUPER));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change RedshiftCopyS3Destination is not going to be used anywhere and should be deleted.

deleted RedshiftCopyS3Destination

Copy link
Contributor

@grishick grishick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another minor comment: description of RedshiftDestination class needs to be updated. Currently, it refers to RedshiftCopyS3Destination, which is essentially getting deprecated with this change.

@grishick
Copy link
Contributor

grishick commented May 9, 2022

@VitaliiMaltsev , could you pease sign the CLA (see comment from CLA above)

…buffering

# Conflicts:
#	airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java
@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented May 10, 2022

/test connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/2299673546
✅ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/2299673546
Python tests coverage:

Name                                                              Stmts   Miss  Cover
-------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                            2      0   100%
normalization/transform_catalog/reserved_keywords.py                 13      0   100%
normalization/transform_catalog/__init__.py                           2      0   100%
normalization/destination_type.py                                    13      0   100%
normalization/__init__.py                                             4      0   100%
normalization/transform_catalog/destination_name_transformer.py     155      8    95%
normalization/transform_config/transform.py                         159     31    81%
normalization/transform_catalog/table_name_registry.py              174     34    80%
normalization/transform_catalog/utils.py                             34      7    79%
normalization/transform_catalog/dbt_macro.py                         22      7    68%
normalization/transform_catalog/catalog_processor.py                147     80    46%
normalization/transform_catalog/transform.py                         61     38    38%
normalization/transform_catalog/stream_processor.py                 534    345    35%
-------------------------------------------------------------------------------------
TOTAL                                                              1320    550    58%

@VitaliiMaltsev
Copy link
Contributor Author

another minor comment: description of RedshiftDestination class needs to be updated. Currently, it refers to RedshiftCopyS3Destination, which is essentially getting deprecated with this change.

updated

@VitaliiMaltsev
Copy link
Contributor Author

@VitaliiMaltsev , could you pease sign the CLA (see comment from CLA above)

done

@VitaliiMaltsev VitaliiMaltsev requested a review from grishick May 10, 2022 10:12
@grishick
Copy link
Contributor

LGTM, feel free to publish the new version (🙏 that publish command works from the first attempt)

@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented May 12, 2022

/publish connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/2312285369
🚀 Successfully published connectors/destination-redshift
🚀 Auto-bumped version for connectors/destination-redshift
✅ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/2312285369

@octavia-squidington-iii octavia-squidington-iii temporarily deployed to more-secrets May 12, 2022 09:21 Inactive
@octavia-squidington-iii octavia-squidington-iii temporarily deployed to more-secrets May 12, 2022 09:21 Inactive
@VitaliiMaltsev VitaliiMaltsev merged commit 4b5bd2b into master May 12, 2022
@VitaliiMaltsev VitaliiMaltsev deleted the vmaltsev/11426-destination-redshift-apply-buffering branch May 12, 2022 09:30
suhomud pushed a commit that referenced this pull request May 23, 2022
* Redshift Destination: Apply buffering strategy

* add manifest uploading

* refactoring

* fixed checkstyle

* updated CHANGELOG

* removed redundant static

* airbyte-12265: Added  stagingOperations.onDestinationCloseOperations() to StagingConsumerFactory.java.

* airbyte-12265: Created operations and copiers java packages.

* safe delete of RedshiftCopyS3Destination.java

* rename tests

* bump version

* auto-bump connector version

Co-authored-by: Oleksandr Tsukanov <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Apply buffering changes to Redshift Destination
6 participants