Skip to content

Commit 4b5bd2b

Browse files
VitaliiMaltsevalexandertsukanovoctavia-squidington-iii
authored
Redshift Destination Apply buffering strategy (#12601)
* Redshift Destination: Apply buffering strategy * add manifest uploading * refactoring * fixed checkstyle * updated CHANGELOG * removed redundant static * airbyte-12265: Added stagingOperations.onDestinationCloseOperations() to StagingConsumerFactory.java. * airbyte-12265: Created operations and copiers java packages. * safe delete of RedshiftCopyS3Destination.java * rename tests * bump version * auto-bump connector version Co-authored-by: Oleksandr Tsukanov <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 5ff96ab commit 4b5bd2b

File tree

18 files changed

+323
-130
lines changed

18 files changed

+323
-130
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@
218218
- name: Redshift
219219
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
220220
dockerRepository: airbyte/destination-redshift
221-
dockerImageTag: 0.3.32
221+
dockerImageTag: 0.3.33
222222
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
223223
icon: redshift.svg
224224
resourceRequirements:

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3461,7 +3461,7 @@
34613461
supported_destination_sync_modes:
34623462
- "overwrite"
34633463
- "append"
3464-
- dockerImage: "airbyte/destination-redshift:0.3.32"
3464+
- dockerImage: "airbyte/destination-redshift:0.3.33"
34653465
spec:
34663466
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
34673467
connectionSpecification:
@@ -3514,27 +3514,30 @@
35143514
default: "public"
35153515
title: "Default Schema"
35163516
s3_bucket_name:
3517-
title: "S3 Bucket Name"
3517+
title: "S3 Bucket Name (Optional)"
35183518
type: "string"
35193519
description: "The name of the staging S3 bucket to use if utilising a COPY\
35203520
\ strategy. COPY is recommended for production workloads for better speed\
3521-
\ and scalability. See <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/c_loading-data-best-practices.html\"\
3521+
\ and scalability. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html\"\
35223522
>AWS docs</a> for more details."
35233523
examples:
35243524
- "airbyte.staging"
35253525
s3_bucket_path:
3526-
title: "S3 Bucket Path"
3526+
title: "S3 Bucket Path (Optional)"
35273527
type: "string"
35283528
description: "The directory under the S3 bucket where data will be written.\
3529-
\ If not provided, then defaults to the root directory."
3529+
\ If not provided, then defaults to the root directory. See <a href=\"\
3530+
https://docs.aws.amazon.com/prescriptive-guidance/latest/defining-bucket-names-data-lakes/faq.html#:~:text=be%20globally%20unique.-,For%20S3%20bucket%20paths,-%2C%20you%20can%20use\"\
3531+
>path's name recommendations</a> for more details."
35303532
examples:
35313533
- "data_sync/test"
35323534
s3_bucket_region:
3533-
title: "S3 Bucket Region"
3535+
title: "S3 Bucket Region (Optional)"
35343536
type: "string"
35353537
default: ""
35363538
description: "The region of the S3 staging bucket to use if utilising a\
3537-
\ copy strategy."
3539+
\ COPY strategy. See <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html#:~:text=In-,Region,-%2C%20choose%20the%20AWS\"\
3540+
>AWS docs</a> for details."
35383541
enum:
35393542
- ""
35403543
- "us-east-1"
@@ -3562,36 +3565,41 @@
35623565
- "me-south-1"
35633566
access_key_id:
35643567
type: "string"
3565-
description: "The Access Key Id granting allow one to access the above S3\
3566-
\ staging bucket. Airbyte requires Read and Write permissions to the given\
3567-
\ bucket."
3568-
title: "S3 Key Id"
3568+
description: "This ID grants access to the above S3 staging bucket. Airbyte\
3569+
\ requires Read and Write permissions to the given bucket. See <a href=\"\
3570+
https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\"\
3571+
>AWS docs</a> on how to generate an access key ID and secret access key."
3572+
title: "S3 Key Id (Optional)"
35693573
airbyte_secret: true
35703574
secret_access_key:
35713575
type: "string"
3572-
description: "The corresponding secret to the above access key id."
3573-
title: "S3 Access Key"
3576+
description: "The corresponding secret to the above access key id. See <a\
3577+
\ href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\"\
3578+
>AWS docs</a> on how to generate an access key ID and secret access key."
3579+
title: "S3 Access Key (Optional)"
35743580
airbyte_secret: true
35753581
part_size:
35763582
type: "integer"
35773583
minimum: 10
35783584
maximum: 100
35793585
examples:
35803586
- "10"
3581-
description: "Optional. Increase this if syncing tables larger than 100GB.\
3582-
\ Only relevant for COPY. Files are streamed to S3 in parts. This determines\
3583-
\ the size of each part, in MBs. As S3 has a limit of 10,000 parts per\
3584-
\ file, part size affects the table size. This is 10MB by default, resulting\
3585-
\ in a default limit of 100GB tables. Note, a larger part size will result\
3587+
description: "Increase this if syncing tables larger than 100GB. Only relevant\
3588+
\ for COPY. Files are streamed to S3 in parts. This determines the size\
3589+
\ of each part, in MBs. As S3 has a limit of 10,000 parts per file, part\
3590+
\ size affects the table size. This is 10MB by default, resulting in a\
3591+
\ default limit of 100GB tables. Note: a larger part size will result\
35863592
\ in larger memory requirements. A rule of thumb is to multiply the part\
3587-
\ size by 10 to get the memory requirement. Modify this with care."
3588-
title: "Stream Part Size"
3593+
\ size by 10 to get the memory requirement. Modify this with care. See\
3594+
\ <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=above%20key%20id.-,Part%20Size,-Affects%20the%20size\"\
3595+
,> docs</a> for details."
3596+
title: "Stream Part Size (Optional)"
35893597
purge_staging_data:
3590-
title: "Purge Staging Files and Tables"
3598+
title: "Purge Staging Files and Tables (Optional)"
35913599
type: "boolean"
35923600
description: "Whether to delete the staging files from S3 after completing\
3593-
\ the sync. See the docs for details. Only relevant for COPY. Defaults\
3594-
\ to true."
3601+
\ the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"\
3602+
> docs</a> for details."
35953603
default: true
35963604
supportsIncremental: true
35973605
supportsNormalization: true

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
204204
throw new RuntimeException("Failed to upload data from stage " + stagingPath, e);
205205
}
206206
writeConfig.clearStagedFiles();
207-
208207
stagingOperations.createTableIfNotExists(database, schemaName, dstTableName);
209208
switch (writeConfig.getSyncMode()) {
210209
case OVERWRITE -> queryList.add(stagingOperations.truncateTableQuery(database, schemaName, dstTableName));
@@ -213,7 +212,7 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
213212
}
214213
queryList.add(stagingOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName));
215214
}
216-
215+
stagingOperations.onDestinationCloseOperations(database, writeConfigs.stream().map(WriteConfig::getOutputSchemaName).collect(Collectors.toSet()));
217216
LOGGER.info("Executing finalization of tables.");
218217
stagingOperations.executeTransaction(database, queryList);
219218
LOGGER.info("Finalizing tables in destination completed.");

airbyte-integrations/connectors/destination-redshift/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.3.32
19+
LABEL io.airbyte.version=0.3.33
2020
LABEL io.airbyte.name=airbyte/destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java

Lines changed: 0 additions & 88 deletions
This file was deleted.

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
* {@link RedshiftInsertDestination} for more detail. The second inserts via streaming the data to
2020
* an S3 bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for
2121
* production workloads, but does require users to set up an S3 bucket and pass in additional
22-
* credentials. See {@link RedshiftCopyS3Destination} for more detail. This class inspect the given
22+
* credentials. See {@link RedshiftStagingS3Destination} for more detail. This class inspect the given
2323
* arguments to determine which strategy to use.
2424
*/
2525
public class RedshiftDestination extends SwitchingDestination<RedshiftDestination.DestinationType> {
@@ -42,7 +42,7 @@ public static DestinationType getTypeFromConfig(final JsonNode config) {
4242
public static Map<DestinationType, Destination> getTypeToDestination() {
4343
return Map.of(
4444
DestinationType.INSERT_WITH_SUPER_TMP_TYPE, new RedshiftInsertDestination(RedshiftDataTmpTableMode.SUPER),
45-
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftCopyS3Destination(RedshiftDataTmpTableMode.SUPER));
45+
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftStagingS3Destination(RedshiftDataTmpTableMode.SUPER));
4646
}
4747

4848
public static DestinationType determineUploadMode(final JsonNode config) {

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313
import io.airbyte.db.jdbc.JdbcDatabase;
1414
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
1515
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
16+
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
1617
import java.util.Map;
1718
import java.util.Optional;
1819

1920
public class RedshiftInsertDestination extends AbstractJdbcDestination {
2021

21-
private static final String DRIVER_CLASS = DatabaseDriver.REDSHIFT.getDriverClassName();
22+
public static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
2223
private static final String USERNAME = "username";
2324
private static final String PASSWORD = "password";
2425
private static final String SCHEMA = "schema";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.redshift;
6+
7+
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS;
8+
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcDatabase;
9+
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig;
10+
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import io.airbyte.commons.json.Jsons;
13+
import io.airbyte.db.jdbc.JdbcDatabase;
14+
import io.airbyte.integrations.base.AirbyteMessageConsumer;
15+
import io.airbyte.integrations.base.Destination;
16+
import io.airbyte.integrations.base.sentry.AirbyteSentry;
17+
import io.airbyte.integrations.destination.NamingConventionTransformer;
18+
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
19+
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
20+
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
21+
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations;
22+
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
23+
import io.airbyte.integrations.destination.s3.S3Destination;
24+
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
25+
import io.airbyte.integrations.destination.s3.S3StorageOperations;
26+
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
27+
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
28+
import io.airbyte.protocol.models.AirbyteConnectionStatus;
29+
import io.airbyte.protocol.models.AirbyteMessage;
30+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
31+
import java.util.Map;
32+
import java.util.function.Consumer;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
public class RedshiftStagingS3Destination extends AbstractJdbcDestination implements Destination {
37+
38+
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class);
39+
private final RedshiftDataTmpTableMode redshiftDataTmpTableMode;
40+
41+
public RedshiftStagingS3Destination(RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
42+
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations(redshiftDataTmpTableMode));
43+
this.redshiftDataTmpTableMode = redshiftDataTmpTableMode;
44+
}
45+
46+
@Override
47+
public AirbyteConnectionStatus check(final JsonNode config) {
48+
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
49+
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");
50+
51+
final NamingConventionTransformer nameTransformer = getNamingResolver();
52+
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
53+
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode);
54+
try (final JdbcDatabase database = getDatabase(config)) {
55+
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
56+
AirbyteSentry.executeWithTracing("CreateAndDropTable",
57+
() -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations));
58+
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
59+
} catch (final Exception e) {
60+
LOGGER.error("Exception while checking connection: ", e);
61+
return new AirbyteConnectionStatus()
62+
.withStatus(AirbyteConnectionStatus.Status.FAILED)
63+
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
64+
}
65+
66+
}
67+
68+
@Override
69+
protected JdbcDatabase getDatabase(final JsonNode config) {
70+
return getJdbcDatabase(config);
71+
}
72+
73+
@Override
74+
protected NamingConventionTransformer getNamingResolver() {
75+
return new RedshiftSQLNameTransformer();
76+
}
77+
78+
@Override
79+
protected Map<String, String> getDefaultConnectionProperties(JsonNode config) {
80+
return SSL_JDBC_PARAMETERS;
81+
}
82+
83+
// this is a no op since we override getDatabase.
84+
@Override
85+
public JsonNode toJdbcConfig(JsonNode config) {
86+
return Jsons.emptyObject();
87+
}
88+
89+
@Override
90+
public AirbyteMessageConsumer getConsumer(final JsonNode config,
91+
final ConfiguredAirbyteCatalog catalog,
92+
final Consumer<AirbyteMessage> outputRecordCollector) {
93+
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
94+
return new StagingConsumerFactory().create(
95+
outputRecordCollector,
96+
getDatabase(config),
97+
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode),
98+
getNamingResolver(),
99+
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
100+
config,
101+
catalog,
102+
isPurgeStagingData(config));
103+
}
104+
105+
private boolean isPurgeStagingData(final JsonNode config) {
106+
return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean();
107+
}
108+
109+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.redshift;
5+
package io.airbyte.integrations.destination.redshift.copiers;
66

77
import com.amazonaws.services.s3.AmazonS3;
88
import com.fasterxml.jackson.databind.ObjectMapper;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.redshift;
5+
package io.airbyte.integrations.destination.redshift.copiers;
66

77
import com.amazonaws.services.s3.AmazonS3;
88
import io.airbyte.db.jdbc.JdbcDatabase;

0 commit comments

Comments
 (0)