Skip to content

Commit aa28d44

Browse files
authored
12708: Add an option to use encryption with staging in Redshift Destination (#13675)
* 12708: Add an option to use encryption with staging in Redshift Destination * 12708: docs/docker configs updated * 12708: merge with master * 12708: merge fix * 12708: code review implementation * 12708: fix for older configs * 12708: fix for older configs in check * 12708: merge from master (consolidation issue) * 12708: versions updated
1 parent dd2d5d0 commit aa28d44

File tree

7 files changed

+119
-6
lines changed

7 files changed

+119
-6
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@
225225
- name: Redshift
226226
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
227227
dockerRepository: airbyte/destination-redshift
228-
dockerImageTag: 0.3.40
228+
dockerImageTag: 0.3.41
229229
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
230230
icon: redshift.svg
231231
resourceRequirements:

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+28-1
Original file line numberDiff line numberDiff line change
@@ -3622,7 +3622,7 @@
36223622
supported_destination_sync_modes:
36233623
- "overwrite"
36243624
- "append"
3625-
- dockerImage: "airbyte/destination-redshift:0.3.40"
3625+
- dockerImage: "airbyte/destination-redshift:0.3.41"
36263626
spec:
36273627
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
36283628
connectionSpecification:
@@ -3773,6 +3773,33 @@
37733773
\ the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"\
37743774
> docs</a> for details."
37753775
default: true
3776+
encryption:
3777+
title: "Encryption"
3778+
description: "How to encrypt the staging data"
3779+
oneOf:
3780+
- title: "No encryption"
3781+
description: "Staging data will be stored in plaintext."
3782+
type: "object"
3783+
required:
3784+
"encryption_type"
3785+
properties:
3786+
encryption_type:
3787+
type: "string"
3788+
const: "none"
3789+
- title: "AES-CBC envelope encryption",
3790+
description: "Staging data will be encrypted using AES-CBC envelope encryption."
3791+
type: "object"
3792+
required:
3793+
"encryption_type"
3794+
properties:
3795+
encryption_type:
3796+
type: "string"
3797+
const: "aes_cbc_envelope"
3798+
key_encrypting_key:
3799+
type: "string"
3800+
title: "Key"
3801+
description: "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.",
3802+
airbyte_secret: true
37763803
supportsIncremental: true
37773804
supportsNormalization: true
37783805
supportsDBT: true

airbyte-integrations/connectors/destination-redshift/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.3.40
19+
LABEL io.airbyte.version=0.3.41
2020
LABEL io.airbyte.name=airbyte/destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@
2525
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
2626
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations;
2727
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
28+
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
29+
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
30+
import io.airbyte.integrations.destination.s3.EncryptionConfig;
31+
import io.airbyte.integrations.destination.s3.NoEncryption;
2832
import io.airbyte.integrations.destination.s3.S3Destination;
2933
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
3034
import io.airbyte.integrations.destination.s3.S3StorageOperations;
3135
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
3236
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
3337
import io.airbyte.protocol.models.AirbyteConnectionStatus;
38+
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
3439
import io.airbyte.protocol.models.AirbyteMessage;
3540
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
3641
import java.util.Map;
@@ -47,14 +52,26 @@ public RedshiftStagingS3Destination() {
4752
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
4853
}
4954

55+
private boolean isEphemeralKeysAndPurgingStagingData(JsonNode config, EncryptionConfig encryptionConfig) {
56+
return !isPurgeStagingData(config) && encryptionConfig instanceof AesCbcEnvelopeEncryption c && c.keyType() == KeyType.EPHEMERAL;
57+
}
58+
5059
@Override
5160
public AirbyteConnectionStatus check(final JsonNode config) {
5261
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
62+
final EncryptionConfig encryptionConfig = config.has("uploading_method") ?
63+
EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption();
64+
if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) {
65+
return new AirbyteConnectionStatus()
66+
.withStatus(Status.FAILED)
67+
.withMessage(
68+
"You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt.");
69+
}
5370
S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");
5471

5572
final NamingConventionTransformer nameTransformer = getNamingResolver();
5673
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
57-
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config);
74+
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, encryptionConfig);
5875
final DataSource dataSource = getDataSource(config);
5976
try {
6077
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
@@ -108,10 +125,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
108125
final ConfiguredAirbyteCatalog catalog,
109126
final Consumer<AirbyteMessage> outputRecordCollector) {
110127
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
128+
final EncryptionConfig encryptionConfig = config.has("uploading_method") ?
129+
EncryptionConfig.fromJson(config.get("uploading_method").get("encryption")) : new NoEncryption();
111130
return new StagingConsumerFactory().create(
112131
outputRecordCollector,
113132
getDatabase(getDataSource(config)),
114-
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config),
133+
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
115134
getNamingResolver(),
116135
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
117136
config,

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,15 @@
1313
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
1414
import io.airbyte.integrations.destination.redshift.manifest.Entry;
1515
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
16+
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
17+
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator;
18+
import io.airbyte.integrations.destination.s3.EncryptionConfig;
1619
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1720
import io.airbyte.integrations.destination.s3.S3StorageOperations;
1821
import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig;
1922
import io.airbyte.integrations.destination.staging.StagingOperations;
23+
import java.util.Base64;
24+
import java.util.Base64.Encoder;
2025
import java.util.List;
2126
import java.util.Map;
2227
import java.util.Optional;
@@ -26,18 +31,27 @@
2631

2732
public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations {
2833

34+
private static final Encoder BASE64_ENCODER = Base64.getEncoder();
2935
private final NamingConventionTransformer nameTransformer;
3036
private final S3StorageOperations s3StorageOperations;
3137
private final S3DestinationConfig s3Config;
3238
private final ObjectMapper objectMapper;
39+
private final byte[] keyEncryptingKey;
3340

3441
public RedshiftS3StagingSqlOperations(NamingConventionTransformer nameTransformer,
3542
AmazonS3 s3Client,
36-
S3DestinationConfig s3Config) {
43+
S3DestinationConfig s3Config,
44+
final EncryptionConfig encryptionConfig) {
3745
this.nameTransformer = nameTransformer;
3846
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config);
3947
this.s3Config = s3Config;
4048
this.objectMapper = new ObjectMapper();
49+
if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) {
50+
this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key()));
51+
this.keyEncryptingKey = e.key();
52+
} else {
53+
this.keyEncryptingKey = null;
54+
}
4155
}
4256

4357
@Override
@@ -99,10 +113,18 @@ public void copyIntoTmpTableFromStage(JdbcDatabase database,
99113

100114
private void executeCopy(final String manifestPath, JdbcDatabase db, String schemaName, String tmpTableName) {
101115
final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig();
116+
final String encryptionClause;
117+
if (keyEncryptingKey == null) {
118+
encryptionClause = "";
119+
} else {
120+
encryptionClause = String.format(" encryption = (type = 'aws_cse' master_key = '%s')", BASE64_ENCODER.encodeToString(keyEncryptingKey));
121+
}
122+
102123
final var copyQuery = String.format(
103124
"""
104125
COPY %s.%s FROM '%s'
105126
CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'
127+
%s
106128
CSV GZIP
107129
REGION '%s' TIMEFORMAT 'auto'
108130
STATUPDATE OFF
@@ -112,6 +134,7 @@ private void executeCopy(final String manifestPath, JdbcDatabase db, String sche
112134
getFullS3Path(s3Config.getBucketName(), manifestPath),
113135
credentialConfig.getAccessKeyId(),
114136
credentialConfig.getSecretAccessKey(),
137+
encryptionClause,
115138
s3Config.getBucketRegion());
116139

117140
Exceptions.toRuntime(() -> db.execute(copyQuery));

airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json

+43
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,49 @@
140140
"type": "boolean",
141141
"description": "Whether to delete the staging files from S3 after completing the sync. See <a href=\"https://docs.airbyte.com/integrations/destinations/redshift/#:~:text=the%20root%20directory.-,Purge%20Staging%20Data,-Whether%20to%20delete\"> docs</a> for details.",
142142
"default": true
143+
},
144+
"encryption": {
145+
"title": "Encryption",
146+
"type": "object",
147+
"description": "How to encrypt the staging data",
148+
"default": { "encryption_type": "none" },
149+
"order": 7,
150+
"oneOf": [
151+
{
152+
"title": "No encryption",
153+
"description": "Staging data will be stored in plaintext.",
154+
"type": "object",
155+
"required": ["encryption_type"],
156+
"properties": {
157+
"encryption_type": {
158+
"type": "string",
159+
"const": "none",
160+
"enum": ["none"],
161+
"default": "none"
162+
}
163+
}
164+
},
165+
{
166+
"title": "AES-CBC envelope encryption",
167+
"description": "Staging data will be encrypted using AES-CBC envelope encryption.",
168+
"type": "object",
169+
"required": ["encryption_type"],
170+
"properties": {
171+
"encryption_type": {
172+
"type": "string",
173+
"const": "aes_cbc_envelope",
174+
"enum": ["aes_cbc_envelope"],
175+
"default": "aes_cbc_envelope"
176+
},
177+
"key_encrypting_key": {
178+
"type": "string",
179+
"title": "Key",
180+
"description": "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.",
181+
"airbyte_secret": true
182+
}
183+
}
184+
}
185+
]
143186
}
144187
}
145188
}

docs/integrations/destinations/redshift.md

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
138138

139139
| Version | Date | Pull Request | Subject |
140140
|:--------|:------------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
141+
| 0.3.41 | 2022-06-21 | [\#13675(https://github.com/airbytehq/airbyte/pull/13675) | Add an option to use encryption with staging in Redshift Destination |
141142
| 0.3.40 | 2022-06-17 | [\#13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager |
142143
| 0.3.39 | 2022-06-02 | [13415](https://github.com/airbytehq/airbyte/pull/13415) | Add dropdown to select Uploading Method. <br /> **PLEASE NOTICE**: After this update your **uploading method** will be set to **Standard**, you will need to reconfigure the method to use **S3 Staging** again. |
143144
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |

0 commit comments

Comments
 (0)