diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 0775676acb8ed..15a41ecda5e9f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -226,7 +226,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.24 + dockerImageTag: 0.4.25 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 7eb63ea62bae7..90ae7951fcc28 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -4076,7 +4076,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.24" +- dockerImage: "airbyte/destination-snowflake:0.4.25" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake" connectionSpecification: @@ -4143,6 +4143,7 @@ order: 5 credentials: title: "Authorization Method" + description: "" type: "object" oneOf: - type: "object" @@ -4212,6 +4213,8 @@ - "method" properties: method: + title: "" + description: "" type: "string" enum: - "Standard" @@ -4225,6 +4228,8 @@ - "method" properties: method: + title: "" + description: "" type: "string" enum: - "Internal Staging" @@ -4241,6 +4246,8 @@ - "secret_access_key" properties: method: + title: "" + description: "" type: "string" enum: - "S3 Staging" @@ -4324,6 +4331,46 @@ \ to true." default: true order: 6 + encryption: + title: "Encryption" + type: "object" + description: "How to encrypt the staging data" + default: + encryption_type: "none" + order: 7 + oneOf: + - title: "No encryption" + description: "Staging data will be stored in plaintext." + type: "object" + required: + - "encryption_type" + properties: + encryption_type: + type: "string" + const: "none" + enum: + - "none" + default: "none" + - title: "AES-CBC envelope encryption" + description: "Staging data will be encrypted using AES-CBC envelope\ + \ encryption." + type: "object" + required: + - "encryption_type" + properties: + encryption_type: + type: "string" + const: "aes_cbc_envelope" + enum: + - "aes_cbc_envelope" + default: "aes_cbc_envelope" + key_encrypting_key: + type: "string" + title: "Key" + description: "The key, base64-encoded. Must be either 128, 192,\ + \ or 256 bits. Leave blank to have Airbyte generate an ephemeral\ + \ key for each sync." + airbyte_secret: true - title: "GCS Staging" additionalProperties: false description: "Writes large batches of records to a file, uploads the file\ @@ -4336,6 +4383,8 @@ - "credentials_json" properties: method: + title: "" + description: "" type: "string" enum: - "GCS Staging" @@ -4382,6 +4431,8 @@ - "azure_blob_storage_sas_token" properties: method: + title: "" + description: "" type: "string" enum: - "Azure Blob Staging" diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java index 172428aa8b00c..ec3b9022b2193 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java @@ -9,7 +9,9 @@ import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3StorageOperations; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,4 +45,8 @@ protected void cleanUpObjects(final String bucket, final List keysTo } } + @Override + protected Map getMetadataMapping() { + return new HashMap<>(); + } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index dc82677001e83..1da2beb67669e 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -59,7 +59,8 @@ public AirbyteMessageConsumer create(final Consumer outputRecord final NamingConventionTransformer namingResolver, final CheckedBiFunction onCreateBuffer, final JsonNode config, - final ConfiguredAirbyteCatalog catalog) { + final ConfiguredAirbyteCatalog catalog, + final boolean purgeStagingData) { final List writeConfigs = createWriteConfigs(namingResolver, config, catalog); return new BufferedStreamConsumer( outputRecordCollector, @@ -68,7 +69,7 @@ public AirbyteMessageConsumer create(final Consumer outputRecord onCreateBuffer, catalog, flushBufferFunction(database, stagingOperations, writeConfigs, catalog)), - onCloseFunction(database, stagingOperations, writeConfigs), + onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData), catalog, stagingOperations::isValidData); } @@ -177,7 +178,8 @@ private CheckedBiConsumer writeConfigs) { + final List writeConfigs, + final boolean purgeStagingData) { return (hasFailed) -> { if (!hasFailed) { final List queryList = new ArrayList<>(); @@ -224,10 +226,12 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database, tmpTableName); stagingOperations.dropTableIfExists(database, schemaName, tmpTableName); - final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName()); - LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName, - stageName); - stagingOperations.dropStageIfExists(database, stageName); + if (purgeStagingData) { + final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName()); + LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName, + stageName); + stagingOperations.dropStageIfExists(database, stageName); + } } LOGGER.info("Cleaning up destination completed."); }; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryption.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryption.java new file mode 100644 index 0000000000000..22ce17ba5147a --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryption.java @@ -0,0 +1,66 @@ +package io.airbyte.integrations.destination.s3; + +import com.fasterxml.jackson.databind.JsonNode; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import javax.annotation.Nonnull; +import javax.crypto.KeyGenerator; +import org.apache.commons.lang3.StringUtils; + +/** + * @param key The key to use for encryption. + * @param keyType Where the key came from. + */ +public record AesCbcEnvelopeEncryption(@Nonnull byte[] key, @Nonnull KeyType keyType) implements EncryptionConfig { + + public enum KeyType { + EPHEMERAL, + USER_PROVIDED + } + + public static AesCbcEnvelopeEncryption fromJson(final JsonNode encryptionNode) { + if (!encryptionNode.has("key_encrypting_key")) { + return encryptionWithRandomKey(); + } + final String kek = encryptionNode.get("key_encrypting_key").asText(); + if (StringUtils.isEmpty(kek)) { + return encryptionWithRandomKey(); + } else { + return new AesCbcEnvelopeEncryption(BASE64_DECODER.decode(kek), KeyType.USER_PROVIDED); + } + } + + private static AesCbcEnvelopeEncryption encryptionWithRandomKey() { + try { + final KeyGenerator kekGenerator = KeyGenerator.getInstance(AesCbcEnvelopeEncryptionBlobDecorator.KEY_ENCRYPTING_ALGO); + kekGenerator.init(AesCbcEnvelopeEncryptionBlobDecorator.AES_KEY_SIZE_BITS); + return new AesCbcEnvelopeEncryption(kekGenerator.generateKey().getEncoded(), KeyType.EPHEMERAL); + } catch (final NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final AesCbcEnvelopeEncryption that = (AesCbcEnvelopeEncryption) o; + + if (!Arrays.equals(key, that.key)) { + return false; + } + return keyType == that.keyType; + } + + @Override + public int hashCode() { + int result = Arrays.hashCode(key); + result = 31 * result + keyType.hashCode(); + return result; + } +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryptionBlobDecorator.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryptionBlobDecorator.java new file mode 100644 index 0000000000000..e105168e7802c --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryptionBlobDecorator.java @@ -0,0 +1,132 @@ +package io.airbyte.integrations.destination.s3; + +import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.OutputStream; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.Base64; +import java.util.Base64.Encoder; +import java.util.Map; +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.CipherOutputStream; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.KeyGenerator; +import javax.crypto.NoSuchPaddingException; +import javax.crypto.SecretKey; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +/** + * This class implements the envelope encryption that Redshift and Snowflake use when loading encrypted files from S3 (or other blob stores): + *
    + *
  • A content-encrypting-key (CEK) is used to encrypt the actual data (i.e. the CSV file)
  • + *
  • A key-encrypting-key (KEK) is used to encrypt the CEK
  • + *
  • The encrypted CEK is stored in the S3 object metadata, along with the plaintext initialization vector
  • + *
  • The COPY command includes the KEK (in plaintext). Redshift/Snowflake will use it to decrypt the CEK, which it then uses to decrypt the CSV file.
  • + *
+ *

+ * A new CEK is generated for each S3 object, but each sync uses a single KEK. The KEK may be either user-provided (if the user wants to keep the data + * for further use), or generated per-sync (if they simply want to add additional security around their COPY operation). + *

+ * Redshift does not support loading directly from GCS or Azure Blob Storage. + *

+ * Snowflake only supports client-side encryption in S3 and Azure Storage; it does not support this feature in GCS (https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html). + * Azure Storage uses a similar envelope encryption technique to S3 (https://docs.microsoft.com/en-us/azure/storage/common/storage-client-side-encryption?tabs=dotnet#encryption-via-the-envelope-technique). + */ +public class AesCbcEnvelopeEncryptionBlobDecorator implements BlobDecorator { + + public static final String ENCRYPTED_CONTENT_ENCRYPTING_KEY = "aes_cbc_envelope_encryption-content-encrypting-key"; + public static final String INITIALIZATION_VECTOR = "aes_cbc_envelope_encryption-initialization-vector"; + + public static final int AES_KEY_SIZE_BITS = 256; + private static final int AES_CBC_INITIALIZATION_VECTOR_SIZE_BYTES = 16; + private static final Encoder BASE64_ENCODER = Base64.getEncoder(); + private static final SecureRandom SECURE_RANDOM = new SecureRandom(); + + public static final String KEY_ENCRYPTING_ALGO = "AES"; + + // There's no specific KeyGenerator for AES/CBC/PKCS5Padding, so we just use a normal AES KeyGenerator + private static final String CONTENT_ENCRYPTING_KEY_ALGO = "AES"; + // Redshift's UNLOAD command uses this cipher mode, so we'll use it here as well. + // TODO If we eventually want to expose client-side encryption in destination-s3, we should probably also implement + // AES-GCM, since it's mostly superior to CBC mode. (if we do that: make sure that the output is compatible with + // aws-java-sdk's AmazonS3EncryptionV2Client, which requires a slightly different set of metadata) + private static final String CONTENT_ENCRYPTING_CIPHER_ALGO = "AES/CBC/PKCS5Padding"; + + // The real "secret key". Should be handled with great care. + private final SecretKey keyEncryptingKey; + // A random key generated for each file. Also should be handled with care. + private final SecretKey contentEncryptingKey; + // Arbitrary bytes required by the CBC algorithm. Not a sensitive value. + // The only requirement is that we never reuse an (IV, CEK) pair. + private final byte[] initializationVector; + + public AesCbcEnvelopeEncryptionBlobDecorator(final SecretKey keyEncryptingKey) { + this(keyEncryptingKey, randomContentEncryptingKey(), randomInitializationVector()); + } + + public AesCbcEnvelopeEncryptionBlobDecorator(final byte[] keyEncryptingKey) { + this(new SecretKeySpec(keyEncryptingKey, KEY_ENCRYPTING_ALGO)); + } + + @VisibleForTesting + AesCbcEnvelopeEncryptionBlobDecorator(final SecretKey keyEncryptingKey, final SecretKey contentEncryptingKey, final byte[] initializationVector) { + this.keyEncryptingKey = keyEncryptingKey; + this.contentEncryptingKey = contentEncryptingKey; + + this.initializationVector = initializationVector; + } + + @SuppressFBWarnings( + value = {"PADORA", "CIPINT"}, + justification = "We're using this cipher for compatibility with Redshift/Snowflake." + ) + @Override + public OutputStream wrap(final OutputStream stream) { + try { + final Cipher dataCipher = Cipher.getInstance(CONTENT_ENCRYPTING_CIPHER_ALGO); + dataCipher.init(Cipher.ENCRYPT_MODE, contentEncryptingKey, new IvParameterSpec(initializationVector)); + return new CipherOutputStream(stream, dataCipher); + } catch (final InvalidAlgorithmParameterException | NoSuchPaddingException | NoSuchAlgorithmException | InvalidKeyException e) { + throw new RuntimeException(e); + } + } + + @SuppressFBWarnings( + value = {"CIPINT", "SECECB"}, + justification = "We're using this cipher for compatibility with Redshift/Snowflake." + ) + @Override + public void updateMetadata(final Map metadata, final Map metadataKeyMapping) { + try { + final Cipher keyCipher = Cipher.getInstance(KEY_ENCRYPTING_ALGO); + keyCipher.init(Cipher.ENCRYPT_MODE, keyEncryptingKey); + final byte[] encryptedCekBytes = keyCipher.doFinal(contentEncryptingKey.getEncoded()); + + BlobDecorator.insertMetadata(metadata, metadataKeyMapping, ENCRYPTED_CONTENT_ENCRYPTING_KEY, BASE64_ENCODER.encodeToString(encryptedCekBytes)); + BlobDecorator.insertMetadata(metadata, metadataKeyMapping, INITIALIZATION_VECTOR, BASE64_ENCODER.encodeToString(initializationVector)); + } catch (final NoSuchPaddingException | NoSuchAlgorithmException | InvalidKeyException | IllegalBlockSizeException | BadPaddingException e) { + throw new RuntimeException(e); + } + } + + private static SecretKey randomContentEncryptingKey() { + try { + final KeyGenerator cekGenerator = KeyGenerator.getInstance(CONTENT_ENCRYPTING_KEY_ALGO); + cekGenerator.init(AES_KEY_SIZE_BITS); + return cekGenerator.generateKey(); + } catch (final NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + private static byte[] randomInitializationVector() { + final byte[] initializationVector = new byte[AES_CBC_INITIALIZATION_VECTOR_SIZE_BYTES]; + SECURE_RANDOM.nextBytes(initializationVector); + return initializationVector; + } +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobDecorator.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobDecorator.java new file mode 100644 index 0000000000000..ad5b800d1e828 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobDecorator.java @@ -0,0 +1,40 @@ +package io.airbyte.integrations.destination.s3; + +import com.google.common.annotations.VisibleForTesting; +import java.io.OutputStream; +import java.util.Map; + +/** + * Represents the ability to modify how a blob is stored, by modifying the data being written and/or the blob's metadata. + */ +public interface BlobDecorator { + + OutputStream wrap(OutputStream stream); + + /** + * Modifies the blob's metadata. + *

+ * In the most common case, BlobDecorator implementations will insert new entries into the metadata map. These entries may be vendor-specific. The + * metadataKeyMapping parameter defines a mapping from the "canonical" keys to the vendor-specific keys. See {@link + * S3StorageOperations#getMetadataMapping()} for an example. + *

+ * If a key is not defined in metadataKeyMapping, it will not be inserted into the metadata. + * + * @param metadata The blob's metadata + * @param metadataKeyMapping The mapping from canonical to vendor-specific key names + */ + void updateMetadata(Map metadata, Map metadataKeyMapping); + + /** + * A convenience method for subclasses. Handles inserting new metadata entries according to the metadataKeyMapping. + */ + @VisibleForTesting + static void insertMetadata(final Map metadata, + final Map metadataKeyMapping, + final String key, + final String value) { + if (metadataKeyMapping.containsKey(key)) { + metadata.put(metadataKeyMapping.get(key), value); + } + } +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobStorageOperations.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobStorageOperations.java index 91e2e9cae5722..a173a2a9e7999 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobStorageOperations.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobStorageOperations.java @@ -6,34 +6,49 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.joda.time.DateTime; -public interface BlobStorageOperations { +public abstract class BlobStorageOperations { - String getBucketObjectPath(String namespace, String streamName, DateTime writeDatetime, String customFormat); + protected final List blobDecorators; + + protected BlobStorageOperations() { + this.blobDecorators = new ArrayList<>(); + } + + public abstract String getBucketObjectPath(String namespace, String streamName, DateTime writeDatetime, String customFormat); /** * Create a storage object where to store data in the destination for a @param objectPath */ - void createBucketObjectIfNotExists(String objectPath) throws Exception; + public abstract void createBucketObjectIfNotExists(String objectPath) throws Exception; /** * Upload the data files into the storage area. * * @return the name of the file that was uploaded. */ - String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String streamName, String objectPath) throws Exception; + public abstract String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String streamName, String objectPath) + throws Exception; /** * Remove files that were just stored in the bucket */ - void cleanUpBucketObject(String objectPath, List stagedFiles) throws Exception; + public abstract void cleanUpBucketObject(String objectPath, List stagedFiles) throws Exception; + + public abstract void cleanUpBucketObject(String namespace, String streamName, String objectPath, String pathFormat); + + public abstract void dropBucketObject(String objectPath); - void cleanUpBucketObject(String namespace, String StreamName, String objectPath, String pathFormat); + public abstract boolean isValidData(JsonNode jsonNode); - void dropBucketObject(String objectPath); + protected abstract Map getMetadataMapping(); - boolean isValidData(JsonNode jsonNode); + public void addBlobDecorator(final BlobDecorator blobDecorator) { + blobDecorators.add(blobDecorator); + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/EncryptionConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/EncryptionConfig.java new file mode 100644 index 0000000000000..ed7875378461c --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/EncryptionConfig.java @@ -0,0 +1,23 @@ +package io.airbyte.integrations.destination.s3; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Base64; +import java.util.Base64.Decoder; + +public sealed interface EncryptionConfig permits AesCbcEnvelopeEncryption, NoEncryption { + Decoder BASE64_DECODER = Base64.getDecoder(); + + static EncryptionConfig fromJson(final JsonNode encryptionNode) { + // For backwards-compatibility. Preexisting configs which don't contain the "encryption" key will pass a null JsonNode into this method. + if (encryptionNode == null) { + return new NoEncryption(); + } + + final String encryptionType = encryptionNode.get("encryption_type").asText(); + return switch (encryptionType) { + case "none" -> new NoEncryption(); + case "aes_cbc_envelope" -> AesCbcEnvelopeEncryption.fromJson(encryptionNode); + default -> throw new IllegalArgumentException("Invalid encryption type: " + encryptionType); + }; + } +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/NoEncryption.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/NoEncryption.java new file mode 100644 index 0000000000000..67bff69a61d9c --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/NoEncryption.java @@ -0,0 +1,5 @@ +package io.airbyte.integrations.destination.s3; + +public final class NoEncryption implements EncryptionConfig { + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index d6da8b29cc967..7690e2bd8f8ef 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -6,7 +6,6 @@ import static org.apache.logging.log4j.util.Strings.isNotBlank; -import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.DeleteObjectsRequest; @@ -15,14 +14,18 @@ import com.amazonaws.services.s3.model.ObjectListing; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.regex.Pattern; import org.apache.commons.io.FilenameUtils; @@ -30,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3StorageOperations implements BlobStorageOperations { +public class S3StorageOperations extends BlobStorageOperations { private static final Logger LOGGER = LoggerFactory.getLogger(S3StorageOperations.class); @@ -108,7 +111,7 @@ public String uploadRecordsToBucket(final SerializableBuffer recordsData, final String objectPath) { final List exceptionsThrown = new ArrayList<>(); while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT) { - if (exceptionsThrown.size() > 0) { + if (!exceptionsThrown.isEmpty()) { LOGGER.info("Retrying to upload records into storage {} ({}/{}})", objectPath, exceptionsThrown.size(), UPLOAD_RETRY_LIMIT); // Force a reconnection before retrying in case error was due to network issues... s3Client = s3Config.resetS3Client(); @@ -133,13 +136,25 @@ private String loadDataIntoBucket(final String objectPath, final SerializableBuf final long partSize = s3Config.getFormatConfig() != null ? s3Config.getFormatConfig().getPartSize() : DEFAULT_PART_SIZE; final String bucket = s3Config.getBucketName(); final String fullObjectKey = objectPath + getPartId(objectPath) + getExtension(recordsData.getFilename()); + + final Map metadata = new HashMap<>(); + for (final BlobDecorator blobDecorator : blobDecorators) { + blobDecorator.updateMetadata(metadata, getMetadataMapping()); + } final StreamTransferManager uploadManager = StreamTransferManagerHelper - .getDefault(bucket, fullObjectKey, s3Client, partSize) + .getDefault(bucket, fullObjectKey, s3Client, partSize, metadata) .checkIntegrity(true) .numUploadThreads(DEFAULT_UPLOAD_THREADS) .queueCapacity(DEFAULT_QUEUE_CAPACITY); boolean succeeded = false; - try (final MultiPartOutputStream outputStream = uploadManager.getMultiPartOutputStreams().get(0); + + // Wrap output stream in decorators + OutputStream rawOutputStream = uploadManager.getMultiPartOutputStreams().get(0); + for (final BlobDecorator blobDecorator : blobDecorators) { + rawOutputStream = blobDecorator.wrap(rawOutputStream); + } + + try (final OutputStream outputStream = rawOutputStream; final InputStream dataStream = recordsData.getInputStream()) { dataStream.transferTo(outputStream); succeeded = true; @@ -271,4 +286,11 @@ public boolean isValidData(final JsonNode jsonNode) { return true; } + @Override + protected Map getMetadataMapping() { + return ImmutableMap.of( + AesCbcEnvelopeEncryptionBlobDecorator.ENCRYPTED_CONTENT_ENCRYPTING_KEY, "x-amz-key", + AesCbcEnvelopeEncryptionBlobDecorator.INITIALIZATION_VECTOR, "x-amz-iv" + ); + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/CompressionTypeHelper.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/CompressionTypeHelper.java index 8b0d3288a0242..6f7df8dd53803 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/CompressionTypeHelper.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/CompressionTypeHelper.java @@ -11,6 +11,9 @@ public class CompressionTypeHelper { + private CompressionTypeHelper() { + } + /** * Sample expected input: { "compression_type": "No Compression" } */ diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/StreamTransferManagerHelper.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/StreamTransferManagerHelper.java index f32360c2e04e7..06b983132f175 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/StreamTransferManagerHelper.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/StreamTransferManagerHelper.java @@ -6,6 +6,7 @@ import alex.mojaki.s3upload.StreamTransferManager; import com.amazonaws.services.s3.AmazonS3; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,17 +26,21 @@ public class StreamTransferManagerHelper { public static final int MAX_ALLOWED_PART_SIZE_MB = 525; public static final int DEFAULT_NUM_STREAMS = 1; - public static StreamTransferManager getDefault(final String bucketName, final String objectKey, final AmazonS3 s3Client, final Long partSize) { + public static StreamTransferManager getDefault(final String bucketName, + final String objectKey, + final AmazonS3 s3Client, + final Long partSize, + final Map userMetadata) { if (partSize == null) { LOGGER.warn(String.format("Part size for StreamTransferManager is not set explicitly. Will use the default one = %sMB. " + "Please note server allows up to 10,000 parts to be uploaded for a single object, i.e. 50GB for stream. " + "Feel free to increase partSize arg, but make sure you have enough memory resources allocated", DEFAULT_PART_SIZE_MB)); - return getDefault(bucketName, objectKey, s3Client); + return getDefault(bucketName, objectKey, s3Client, userMetadata); } if (partSize < DEFAULT_PART_SIZE_MB) { LOGGER.warn(String.format("By the server limitation part size can't be less than %sMB which is already set by default. " + "Will use the default value", DEFAULT_PART_SIZE_MB)); - return getDefault(bucketName, objectKey, s3Client); + return getDefault(bucketName, objectKey, s3Client, userMetadata); } if (partSize > MAX_ALLOWED_PART_SIZE_MB) { LOGGER.warn( @@ -45,7 +50,7 @@ public static StreamTransferManager getDefault(final String bucketName, final St + " will be thrown. The total object size can be at most 5 TB, so there is no reason to set this higher" + " than 525MB. If you're using more streams, you may want a higher value in case some streams get more data than others. " + "So will use max allowed value =" + MAX_ALLOWED_PART_SIZE_MB); - return new StreamTransferManager(bucketName, objectKey, s3Client) + return new StreamTransferManagerWithMetadata(bucketName, objectKey, s3Client, userMetadata) .numStreams(DEFAULT_NUM_STREAMS) .queueCapacity(DEFAULT_QUEUE_CAPACITY) .numUploadThreads(DEFAULT_UPLOAD_THREADS) @@ -53,20 +58,27 @@ public static StreamTransferManager getDefault(final String bucketName, final St } LOGGER.info(String.format("PartSize arg is set to %s MB", partSize)); - return new StreamTransferManager(bucketName, objectKey, s3Client) + return new StreamTransferManagerWithMetadata(bucketName, objectKey, s3Client, userMetadata) .numStreams(DEFAULT_NUM_STREAMS) .queueCapacity(DEFAULT_QUEUE_CAPACITY) .numUploadThreads(DEFAULT_UPLOAD_THREADS) .partSize(partSize); } - private static StreamTransferManager getDefault(final String bucketName, final String objectKey, final AmazonS3 s3Client) { + public static StreamTransferManager getDefault(final String bucketName, final String objectKey, final AmazonS3 s3Client, final Long partSize) { + return getDefault(bucketName, objectKey, s3Client, partSize, null); + } + + private static StreamTransferManager getDefault(final String bucketName, + final String objectKey, + final AmazonS3 s3Client, + final Map userMetadata) { // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not // have support for streaming multipart uploads. The alternative is first writing the entire // output to disk before loading into S3. This is not feasible with large input. // Data is chunked into parts during the upload. A part is sent off to a queue to be uploaded // once it has reached it's configured part size. - return new StreamTransferManager(bucketName, objectKey, s3Client) + return new StreamTransferManagerWithMetadata(bucketName, objectKey, s3Client, userMetadata) .numStreams(DEFAULT_NUM_STREAMS) .queueCapacity(DEFAULT_QUEUE_CAPACITY) .numUploadThreads(DEFAULT_UPLOAD_THREADS) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/StreamTransferManagerWithMetadata.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/StreamTransferManagerWithMetadata.java new file mode 100644 index 0000000000000..b29603c2883ce --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/StreamTransferManagerWithMetadata.java @@ -0,0 +1,37 @@ +package io.airbyte.integrations.destination.s3.util; + +import alex.mojaki.s3upload.StreamTransferManager; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import java.util.Map; + +/** + * A custom stream transfer manager which overwrites the metadata on the InitiateMultipartUploadRequest. + *

+ * This is, apparently, the correct way to implement this functionality. https://github.com/alexmojaki/s3-stream-upload/issues/3 + */ +public class StreamTransferManagerWithMetadata extends StreamTransferManager { + + private final Map userMetadata; + + public StreamTransferManagerWithMetadata(final String bucketName, + final String putKey, + final AmazonS3 s3Client, + final Map userMetadata) { + super(bucketName, putKey, s3Client); + this.userMetadata = userMetadata; + } + + @Override + public void customiseInitiateRequest(final InitiateMultipartUploadRequest request) { + if (userMetadata != null) { + ObjectMetadata objectMetadata = request.getObjectMetadata(); + if (objectMetadata == null) { + objectMetadata = new ObjectMetadata(); + } + objectMetadata.setUserMetadata(userMetadata); + request.setObjectMetadata(objectMetadata); + } + } +} diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryptionBlobDecoratorTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryptionBlobDecoratorTest.java new file mode 100644 index 0000000000000..e5c912706dce5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/AesCbcEnvelopeEncryptionBlobDecoratorTest.java @@ -0,0 +1,87 @@ +package io.airbyte.integrations.destination.s3; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Base64.Decoder; +import java.util.HashMap; +import java.util.Map; +import javax.crypto.spec.SecretKeySpec; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class AesCbcEnvelopeEncryptionBlobDecoratorTest { + + private static final Decoder BASE64_DECODER = Base64.getDecoder(); + // A random base64-encoded 256-bit AES key + public static final String KEY_ENCRYPTING_KEY = "oFf0LY0Zae9ksNZsPSJG8ZLGRRBUUhitaPKWRPPKTvM="; + // Another base64-encoded random 256-bit AES key + public static final String CONTENT_ENCRYPTING_KEY = "9ZAVuZE8L4hJCFQS49OMNeFRGTCBUHAFOgkW3iZkOq8="; + // A random base64-encoded 16-byte array + public static final String INITIALIZATION_VECTOR = "04YDvMCXpvTb2ilggLbDJQ=="; + // A small CSV file, which looks similar to what destination-s3 might upload + public static final String PLAINTEXT = """ + adc66b6e-6051-42db-b683-d978a51c3c02,"{""campaign.resource_name"":""cus""}",2022-04-04 22:32:50.046 + 0e253b28-bec6-4a90-8622-629d3e542982,"{""campaign.resource_name"":""cus""}",2022-04-04 22:32:50.047 + """; + // The encryption of the plaintext, using the CEK and IV defined above (base64-encoded). Equivalent to: + // base64Encode(encrypt("AES-CBC", PLAINTEXT, CONTENT_ENCRYPTING_KEY, INITIALIZATION_VECTOR) + public static final String CIPHERTEXT = "IRfz0FN05Y9yyne+0V+G14xYjA4B0+ter7qniDheIu9UM3Fdmu/mqjyFvYFIRTroP5kNJ1SH3FaArE5aHkrWMPwSkczkhArajfYX+UEfGH68YyWOSnpdxuviTTgK3Ee3OVTz3ZlziOB8jCMjupJ9pqkLnxg7Ghe3BQ1puOHGFDMmIgiP4Zfz0fkdlUyZOvsJ7xpncD24G6IIJNwOyo4CedULgueHdybmxr4oddhAja8QxJxZzlfZl4suJ+KWvt78MSdkRlp+Ip99U8n0O7BLJA=="; + // The encryption of the CEK, using the KEK defined above (base64-encoded). Equivalent to: + // base64Encode(encrypt("AES-ECB", CONTENT_ENCRYPTING_KEY, KEY_ENCRYPTING_KEY) + public static final String ENCRYPTED_CEK = "Ck5u5cKqcY+bcFBrpsPHHUNw5Qx8nYDJ2Vqt6XG6kwxjVAJQKKljPv9NDsG6Ncoc"; + + private AesCbcEnvelopeEncryptionBlobDecorator decorator; + + @BeforeEach + public void setup() { + decorator = new AesCbcEnvelopeEncryptionBlobDecorator( + new SecretKeySpec(BASE64_DECODER.decode(KEY_ENCRYPTING_KEY), "AES"), + new SecretKeySpec(BASE64_DECODER.decode(CONTENT_ENCRYPTING_KEY), "AES"), + BASE64_DECODER.decode(INITIALIZATION_VECTOR) + ); + } + + @Test + public void testEncryption() throws IOException { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + try (final OutputStream wrapped = decorator.wrap(stream)) { + IOUtils.write( + PLAINTEXT, + wrapped, + StandardCharsets.UTF_8 + ); + } + + Assertions.assertArrayEquals( + BASE64_DECODER.decode(CIPHERTEXT), + stream.toByteArray() + ); + } + + @Test + public void testMetadataInsertion() { + final Map metadata = new HashMap<>(); + + decorator.updateMetadata( + metadata, + Map.of( + AesCbcEnvelopeEncryptionBlobDecorator.ENCRYPTED_CONTENT_ENCRYPTING_KEY, "the_cek", + AesCbcEnvelopeEncryptionBlobDecorator.INITIALIZATION_VECTOR, "the_iv" + ) + ); + + Assertions.assertEquals( + Map.of( + "the_cek", ENCRYPTED_CEK, + "the_iv", INITIALIZATION_VECTOR + ), + metadata + ); + } +} diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/BlobDecoratorTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/BlobDecoratorTest.java new file mode 100644 index 0000000000000..fa702025c212f --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/BlobDecoratorTest.java @@ -0,0 +1,58 @@ +package io.airbyte.integrations.destination.s3; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class BlobDecoratorTest { + + @Test + public void testOverwriteMetadata() { + final Map metadata = new HashMap<>(); + metadata.put("amz-foo", "oldValue"); + + BlobDecorator.insertMetadata( + metadata, + Map.of("foo", "amz-foo"), + "foo", "newValue" + ); + + assertEquals(Map.of("amz-foo", "newValue"), metadata); + } + + @Test + public void testNewMetadata() { + final Map metadata = new HashMap<>(); + metadata.put("amz-foo", "oldValue"); + + BlobDecorator.insertMetadata( + metadata, + Map.of("bar", "amz-bar"), + "bar", "newValue" + ); + + assertEquals( + Map.of( + "amz-foo", "oldValue", + "amz-bar", "newValue" + ), + metadata + ); + } + + @Test + public void testSkipMetadata() { + final Map metadata = new HashMap<>(); + metadata.put("amz-foo", "oldValue"); + + BlobDecorator.insertMetadata( + metadata, + Map.of("foo", "amz-foo"), + "bar", "newValue" + ); + + assertEquals(Map.of("amz-foo", "oldValue"), metadata); + } +} diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 2475447387bd1..a187a9d25888d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.24 +LABEL io.airbyte.version=0.4.25 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java deleted file mode 100644 index f3e32d999d739..0000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.snowflake; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.destination.jdbc.SqlOperations; -import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; -import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; -import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig; -import io.airbyte.integrations.destination.s3.S3Destination; -import io.airbyte.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.integrations.destination.s3.S3StorageOperations; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.util.function.Consumer; - -public class SnowflakeCopyS3Destination extends CopyDestination { - - @Override - public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { - return CopyConsumerFactory.create( - outputRecordCollector, - getDatabase(config), - getSqlOperations(), - getNameTransformer(), - S3CopyConfig.getS3CopyConfig(config.get("loading_method")), - catalog, - new SnowflakeS3StreamCopierFactory(), - getConfiguredSchema(config)); - } - - @Override - public void checkPersistence(final JsonNode config) { - final S3DestinationConfig s3Config = getS3DestinationConfig(config); - S3Destination.attemptS3WriteAndDelete(new S3StorageOperations(getNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, ""); - } - - @Override - public ExtendedNameTransformer getNameTransformer() { - return new SnowflakeSQLNameTransformer(); - } - - @Override - public JdbcDatabase getDatabase(final JsonNode config) { - return SnowflakeDatabase.getDatabase(config); - } - - @Override - public SqlOperations getSqlOperations() { - return new SnowflakeSqlOperations(); - } - - private String getConfiguredSchema(final JsonNode config) { - return config.get("schema").asText(); - } - - private S3DestinationConfig getS3DestinationConfig(final JsonNode config) { - final JsonNode loadingMethod = config.get("loading_method"); - return S3DestinationConfig.getS3DestinationConfig(loadingMethod); - } - -} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index b10ef3d900329..fb6b16c610162 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -96,7 +96,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, getNamingResolver(), CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)), config, - catalog); + catalog, + true); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java index 4d889d6de6eb7..a46363aa10946 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java @@ -13,10 +13,14 @@ import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.record_buffer.FileBuffer; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType; +import io.airbyte.integrations.destination.s3.EncryptionConfig; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer; import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.Collections; @@ -41,15 +45,22 @@ public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTrans @Override public AirbyteConnectionStatus check(final JsonNode config) { final S3DestinationConfig s3Config = getS3DestinationConfig(config); + final EncryptionConfig encryptionConfig = EncryptionConfig.fromJson(config.get("loading_method").get("encryption")); + if (!isPurgeStagingData(config) && encryptionConfig instanceof AesCbcEnvelopeEncryption c && c.keyType() == KeyType.EPHEMERAL) { + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage( + "You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt."); + } final NamingConventionTransformer nameTransformer = getNamingResolver(); - final SnowflakeS3StagingSqlOperations SnowflakeS3StagingSqlOperations = - new SnowflakeS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config); + final SnowflakeS3StagingSqlOperations snowflakeS3StagingSqlOperations = + new SnowflakeS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, encryptionConfig); try (final JdbcDatabase database = getDatabase(config)) { final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText()); AirbyteSentry.executeWithTracing("CreateAndDropTable", - () -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, SnowflakeS3StagingSqlOperations)); + () -> attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations)); AirbyteSentry.executeWithTracing("CreateAndDropStage", - () -> attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, SnowflakeS3StagingSqlOperations)); + () -> attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations)); return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); } catch (final Exception e) { LOGGER.error("Exception while checking connection: ", e); @@ -93,14 +104,16 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { final S3DestinationConfig s3Config = getS3DestinationConfig(config); + final EncryptionConfig encryptionConfig = EncryptionConfig.fromJson(config.get("loading_method").get("encryption")); return new StagingConsumerFactory().create( outputRecordCollector, getDatabase(config), - new SnowflakeS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config), + new SnowflakeS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig), getNamingResolver(), CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)), config, - catalog); + catalog, + isPurgeStagingData(config)); } private S3DestinationConfig getS3DestinationConfig(final JsonNode config) { @@ -108,4 +121,12 @@ private S3DestinationConfig getS3DestinationConfig(final JsonNode config) { return S3DestinationConfig.getS3DestinationConfig(loadingMethod); } + private static boolean isPurgeStagingData(final JsonNode config) { + final JsonNode loadingMethod = config.get("loading_method"); + if (!loadingMethod.has("purge_staging_data")) { + return true; + } else { + return loadingMethod.get("purge_staging_data").asBoolean(); + } + } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java index fd6c7201ce6c0..f95493365ed14 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java @@ -10,10 +10,15 @@ import io.airbyte.integrations.base.sentry.AirbyteSentry; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption; +import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator; +import io.airbyte.integrations.destination.s3.EncryptionConfig; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3StorageOperations; import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import io.airbyte.integrations.destination.staging.StagingOperations; +import java.util.Base64; +import java.util.Base64.Encoder; import java.util.List; import java.util.Map; import java.util.UUID; @@ -24,6 +29,7 @@ public class SnowflakeS3StagingSqlOperations extends SnowflakeSqlOperations implements StagingOperations { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class); + private static final Encoder BASE64_ENCODER = Base64.getEncoder(); private static final String COPY_QUERY = "COPY INTO %s.%s FROM '%s' " + "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') " + "file_format = (type = csv compression = auto field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')"; @@ -31,13 +37,21 @@ public class SnowflakeS3StagingSqlOperations extends SnowflakeSqlOperations impl private final NamingConventionTransformer nameTransformer; private final S3StorageOperations s3StorageOperations; private final S3DestinationConfig s3Config; + private final byte[] keyEncryptingKey; public SnowflakeS3StagingSqlOperations(final NamingConventionTransformer nameTransformer, final AmazonS3 s3Client, - final S3DestinationConfig s3Config) { + final S3DestinationConfig s3Config, + final EncryptionConfig encryptionConfig) { this.nameTransformer = nameTransformer; this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config); this.s3Config = s3Config; + if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) { + this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key())); + this.keyEncryptingKey = e.key(); + } else { + this.keyEncryptingKey = null; + } } @Override @@ -87,26 +101,32 @@ public void copyIntoTmpTableFromStage(final JdbcDatabase database, LOGGER.info("Starting copy to tmp table from stage: {} in destination from stage: {}, schema: {}, .", dstTableName, stagingPath, schemaName); // Print actual SQL query if user needs to manually force reload from staging AirbyteSentry.executeWithTracing("CopyIntoTableFromStage", - () -> Exceptions.toRuntime(() -> database.execute(getCopyQuery(stageName, stagingPath, stagedFiles, dstTableName, schemaName))), + () -> Exceptions.toRuntime(() -> database.execute(getCopyQuery(stagingPath, stagedFiles, dstTableName, schemaName))), Map.of("schema", schemaName, "path", stagingPath, "table", dstTableName)); LOGGER.info("Copy to tmp table {}.{} in destination complete.", schemaName, dstTableName); } - protected String getCopyQuery(final String stageName, - final String stagingPath, + protected String getCopyQuery(final String stagingPath, final List stagedFiles, final String dstTableName, final String schemaName) { final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig(); - return String.format(COPY_QUERY + generateFilesList(stagedFiles) + ";", + final String encryptionClause; + if (keyEncryptingKey == null) { + encryptionClause = ""; + } else { + encryptionClause = String.format(" encryption = (type = 'aws_cse' master_key = '%s')", BASE64_ENCODER.encodeToString(keyEncryptingKey)); + } + return String.format(COPY_QUERY + generateFilesList(stagedFiles) + encryptionClause + ";", schemaName, dstTableName, - generateBucketPath(stageName, stagingPath), + generateBucketPath(stagingPath), credentialConfig.getAccessKeyId(), - credentialConfig.getSecretAccessKey()); + credentialConfig.getSecretAccessKey() + ); } - private String generateBucketPath(final String stageName, final String stagingPath) { + private String generateBucketPath(final String stagingPath) { return "s3://" + s3Config.getBucketName() + "/" + stagingPath; } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index 4e011978862d5..f02f6172349ef 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -58,6 +58,7 @@ }, "credentials": { "title": "Authorization Method", + "description": "", "type": "object", "oneOf": [ { @@ -136,6 +137,8 @@ "required": ["method"], "properties": { "method": { + "title": "", + "description": "", "type": "string", "enum": ["Standard"], "default": "Standard" @@ -149,6 +152,8 @@ "required": ["method"], "properties": { "method": { + "title": "", + "description": "", "type": "string", "enum": ["Internal Staging"], "default": "Internal Staging" @@ -167,6 +172,8 @@ ], "properties": { "method": { + "title": "", + "description": "", "type": "string", "enum": ["S3 Staging"], "default": "S3 Staging", @@ -240,6 +247,49 @@ "description": "Whether to delete the staging files from S3 after completing the sync. See the docs for details. Only relevant for COPY. Defaults to true.", "default": true, "order": 6 + }, + "encryption": { + "title": "Encryption", + "type": "object", + "description": "How to encrypt the staging data", + "default": { "encryption_type": "none" }, + "order": 7, + "oneOf": [ + { + "title": "No encryption", + "description": "Staging data will be stored in plaintext.", + "type": "object", + "required": ["encryption_type"], + "properties": { + "encryption_type": { + "type": "string", + "const": "none", + "enum": ["none"], + "default": "none" + } + } + }, + { + "title": "AES-CBC envelope encryption", + "description": "Staging data will be encrypted using AES-CBC envelope encryption.", + "type": "object", + "required": ["encryption_type"], + "properties": { + "encryption_type": { + "type": "string", + "const": "aes_cbc_envelope", + "enum": ["aes_cbc_envelope"], + "default": "aes_cbc_envelope" + }, + "key_encrypting_key": { + "type": "string", + "title": "Key", + "description": "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.", + "airbyte_secret": true + } + } + } + ] } } }, @@ -255,6 +305,8 @@ ], "properties": { "method": { + "title": "", + "description": "", "type": "string", "enum": ["GCS Staging"], "default": "GCS Staging", @@ -296,6 +348,8 @@ ], "properties": { "method": { + "title": "", + "description": "", "type": "string", "enum": ["Azure Blob Staging"], "default": "Azure Blob Staging", diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyEncryptedDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyEncryptedDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..a68bb3a6c3d99 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyEncryptedDestinationAcceptanceTest.java @@ -0,0 +1,18 @@ +package io.airbyte.integrations.destination.snowflake; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; + +public class SnowflakeS3CopyEncryptedDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest { + + @Override + public JsonNode getStaticConfig() { + final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_s3_encrypted_config.json"))); + Preconditions.checkArgument(SnowflakeDestinationResolver.isS3Copy(copyConfig)); + Preconditions.checkArgument(!SnowflakeDestinationResolver.isGcsCopy(copyConfig)); + return copyConfig; + } +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index c3281972752f3..cd7f376b84dd3 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -108,7 +108,8 @@ public void testCleanupStageOnFailure() throws Exception { new SnowflakeSQLNameTransformer(), CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(".csv")), config, - getCatalog()); + getCatalog(), + true); doThrow(SQLException.class).when(sqlOperations).copyIntoTmpTableFromStage(any(), anyString(), anyString(), anyList(), anyString(), anyString()); airbyteMessageConsumer.start(); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java index 33941e3f51d52..7d1b10ab430a2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java @@ -9,6 +9,7 @@ import static org.mockito.Mockito.when; import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.integrations.destination.s3.NoEncryption; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import java.util.List; @@ -17,7 +18,6 @@ class SnowflakeS3StagingSqlOperationsTest { private static final String SCHEMA_NAME = "schemaName"; - private static final String STAGE_NAME = "stageName"; private static final String STAGE_PATH = "stagePath/2022/"; private static final String TABLE_NAME = "tableName"; private static final String BUCKET_NAME = "bucket_name"; @@ -27,7 +27,7 @@ class SnowflakeS3StagingSqlOperationsTest { private final S3AccessKeyCredentialConfig credentialConfig = mock(S3AccessKeyCredentialConfig.class); private final SnowflakeS3StagingSqlOperations snowflakeStagingSqlOperations = - new SnowflakeS3StagingSqlOperations(new SnowflakeSQLNameTransformer(), s3Client, s3Config); + new SnowflakeS3StagingSqlOperations(new SnowflakeSQLNameTransformer(), s3Client, s3Config, new NoEncryption()); @Test void copyIntoTmpTableFromStage() { @@ -39,7 +39,7 @@ void copyIntoTmpTableFromStage() { when(credentialConfig.getAccessKeyId()).thenReturn("aws_access_key_id"); when(credentialConfig.getSecretAccessKey()).thenReturn("aws_secret_access_key"); final String actualCopyQuery = - snowflakeStagingSqlOperations.getCopyQuery(STAGE_NAME, STAGE_PATH, List.of("filename1", "filename2"), TABLE_NAME, SCHEMA_NAME); + snowflakeStagingSqlOperations.getCopyQuery(STAGE_PATH, List.of("filename1", "filename2"), TABLE_NAME, SCHEMA_NAME); assertEquals(expectedQuery, actualCopyQuery); } diff --git a/build.gradle b/build.gradle index 597450343b7f7..f67a0d945fffc 100644 --- a/build.gradle +++ b/build.gradle @@ -325,6 +325,7 @@ subprojects { // adds owasp plugin spotbugsPlugins 'com.h3xstream.findsecbugs:findsecbugs-plugin:1.11.0' + implementation 'com.github.spotbugs:spotbugs-annotations:4.6.0' } tasks.withType(Tar) { diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 0a77e37a64a69..e2a404ad55fa7 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -1,6 +1,6 @@ # Snowflake -Setting up the Snowflake destination connector involves setting up Snowflake entities (warehouse, database, schema, user, and role) in the Snowflake console, then setting up the data loading method (internal stage, AWS S3, GCS bucket, or Azure Blob Storage), and then configuring the Snowflake destination connector using the Airbyte UI. +Setting up the Snowflake destination connector involves setting up Snowflake entities (warehouse, database, schema, user, and role) in the Snowflake console, then setting up the data loading method (internal stage, AWS S3, GCS bucket, or Azure Blob Storage), and then configuring the Snowflake destination connector using the Airbyte UI. This page describes the step-by-step process of setting up the Snowflake destination connector. @@ -15,11 +15,11 @@ To set up the Snowflake destination connector, you first need to create Airbyte- You can use the following script in a new [Snowflake worksheet](https://docs.snowflake.com/en/user-guide/ui-worksheet.html) to create the entities: -1. [Log into your Snowflake account](https://www.snowflake.com/login/). +1. [Log into your Snowflake account](https://www.snowflake.com/login/). 2. Edit the following script to change the password to a more secure password and to change the names of other resources if you so desire. **Note:** Make sure you follow the [Snowflake identifier requirements](https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html) while renaming the resources. - + -- set variables (these need to be uppercase) set airbyte_role = 'AIRBYTE_ROLE'; set airbyte_username = 'AIRBYTE_USER'; @@ -88,7 +88,7 @@ You can use the following script in a new [Snowflake worksheet](https://docs.sno to role identifier($airbyte_role); commit; - + 3. Run the script using the [Worksheet page](https://docs.snowflake.com/en/user-guide/ui-worksheet.html) or [Snowlight](https://docs.snowflake.com/en/user-guide/ui-snowsight-gs.html). Make sure to select the **All Queries** checkbox. @@ -103,7 +103,7 @@ You can also store data externally using an [Amazon S3 bucket](https://docs.aws. ### Using an Amazon S3 bucket -To use an Amazon S3 bucket, [create a new Amazon S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) with read/write access for Airbyte to stage data to Snowflake. +To use an Amazon S3 bucket, [create a new Amazon S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) with read/write access for Airbyte to stage data to Snowflake. ### Using a Google Cloud Storage (GCS) bucket @@ -111,7 +111,7 @@ To use an Amazon S3 bucket, [create a new Amazon S3 bucket](https://docs.aws.ama To use a GCS bucket: 1. Navigate to the Google Cloud Console and [create a new GCS bucket](https://cloud.google.com/storage/docs/creating-buckets) with read/write access for Airbyte to stage data to Snowflake. -2. [Generate a JSON key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys) for your service account. +2. [Generate a JSON key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys) for your service account. 3. Edit the following script to replace `AIRBYTE_ROLE` with the role you used for Airbyte's Snowflake configuration and `YOURBUCKETNAME` with your GCS bucket name. ```text create storage INTEGRATION gcs_airbyte_integration @@ -130,7 +130,7 @@ To use a GCS bucket: DESC STORAGE INTEGRATION gcs_airbyte_integration; ``` The final query should show a `STORAGE_GCP_SERVICE_ACCOUNT` property with an email as the property value. Add read/write permissions to your bucket with that email. - + 4. Navigate to the Snowflake UI and run the script as a [Snowflake account admin](https://docs.snowflake.com/en/user-guide/security-access-control-considerations.html) using the [Worksheet page](https://docs.snowflake.com/en/user-guide/ui-worksheet.html) or [Snowlight](https://docs.snowflake.com/en/user-guide/ui-snowsight-gs.html). ### Using Azure Blob Storage @@ -180,6 +180,7 @@ To use AWS S3 as the cloud storage, enter the information for the S3 bucket you | S3 Access Key * | The corresponding secret to the S3 Key ID. | | Stream Part Size (Optional) | Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables.
Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care. (e.g. 5) | | Purge Staging Files and Tables | Determines whether to delete the staging files from S3 after completing the sync. Specifically, the connector will create CSV files named `bucketPath/namespace/streamName/syncDate_epochMillis_randomUuid.csv` containing three columns (`ab_id`, `data`, `emitted_at`). Normally these files are deleted after sync; if you want to keep them for other purposes, set `purge_staging_data` to false. | +| Encryption | Whether files on S3 are encrypted. You probably don't need to enable this, but it can provide an additional layer of security if you are sharing your data storage with other applications. If you do use encryption, you must choose between ephemeral keys (Airbyte will automatically generate a new key for each sync, and nobody but Airbyte and Snowflake will be able to read the data on S3) or providing your own key (if you have the "Purge staging files and tables" option disabled, and you want to be able to decrypt the data yourself) | To use GCS as the cloud storage, enter the information for the GCS bucket you created in Step 2: @@ -199,7 +200,7 @@ To use Azure Blob storage, enter the information for the storage you created in | SAS Token | The SAS Token you provided in Step 2. | -## Output schema +## Output schema Airbyte outputs each stream into its own table with the following columns in Snowflake: @@ -235,6 +236,7 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | +| 0.4.25 | 2022-05-03 | [\#12452](https://github.com/airbytehq/airbyte/pull/12452) | Add support for encrypted staging on S3; fix the purge_staging_files option | | 0.4.24 | 2022-03-24 | [\#11093](https://github.com/airbytehq/airbyte/pull/11093) | Added OAuth support (Compatible with Airbyte Version 0.35.60+)| | 0.4.22 | 2022-03-18 | [\#10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | | 0.4.21 | 2022-03-18 | [\#11071](https://github.com/airbytehq/airbyte/pull/11071) | Switch to compressed on-disk buffering before staging to s3/internal stage |