Skip to content

🎉 Destination Snowflake: Add option to stage encrypted files via S3 #12452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -43,4 +45,8 @@ protected void cleanUpObjects(final String bucket, final List<KeyVersion> keysTo
}
}

@Override
protected Map<String, String> getMetadataMapping() {
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecord
final NamingConventionTransformer namingResolver,
final CheckedBiFunction<AirbyteStreamNameNamespacePair, ConfiguredAirbyteCatalog, SerializableBuffer, Exception> onCreateBuffer,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog);
return new BufferedStreamConsumer(
outputRecordCollector,
Expand All @@ -68,7 +69,7 @@ public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecord
onCreateBuffer,
catalog,
flushBufferFunction(database, stagingOperations, writeConfigs, catalog)),
onCloseFunction(database, stagingOperations, writeConfigs),
onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData),
catalog,
stagingOperations::isValidData);
}
Expand Down Expand Up @@ -177,7 +178,8 @@ private CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Ex

private OnCloseFunction onCloseFunction(final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs) {
final List<WriteConfig> writeConfigs,
final boolean purgeStagingData) {
return (hasFailed) -> {
if (!hasFailed) {
final List<String> queryList = new ArrayList<>();
Expand Down Expand Up @@ -224,10 +226,12 @@ private OnCloseFunction onCloseFunction(final JdbcDatabase database,
tmpTableName);

stagingOperations.dropTableIfExists(database, schemaName, tmpTableName);
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName());
LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stageName);
stagingOperations.dropStageIfExists(database, stageName);
if (purgeStagingData) {
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName());
LOGGER.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stageName);
stagingOperations.dropStageIfExists(database, stageName);
}
}
LOGGER.info("Cleaning up destination completed.");
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.airbyte.integrations.destination.s3;

import com.fasterxml.jackson.databind.JsonNode;
import java.security.NoSuchAlgorithmException;
import javax.annotation.Nonnull;
import javax.crypto.KeyGenerator;

/**
* @param key The key to use for encryption.
* @param keyType Where the key came from.
*/
public record AesCbcEnvelopeEncryption(@Nonnull byte[] key, @Nonnull KeyType keyType) implements EncryptionConfig {

public enum KeyType {
EPHEMERAL,
USER_PROVIDED
}

public static AesCbcEnvelopeEncryption fromJson(final JsonNode encryptionNode) {
final JsonNode kekNode = encryptionNode.get("key_encrypting_key");
final String keyType = kekNode.get("key_type").asText();
return switch (keyType) {
case "user_provided" -> new AesCbcEnvelopeEncryption(BASE64_DECODER.decode(kekNode.get("key").asText()), KeyType.USER_PROVIDED);
case "ephemeral" -> encryptionWithRandomKey();
default -> throw new IllegalArgumentException("Invalid key type: " + keyType);
};
}

private static AesCbcEnvelopeEncryption encryptionWithRandomKey() {
try {
final KeyGenerator kekGenerator = KeyGenerator.getInstance(AesCbcEnvelopeEncryptionBlobDecorator.KEY_ENCRYPTING_ALGO);
kekGenerator.init(AesCbcEnvelopeEncryptionBlobDecorator.AES_KEY_SIZE_BITS);
return new AesCbcEnvelopeEncryption(kekGenerator.generateKey().getEncoded(), KeyType.EPHEMERAL);
} catch (final NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package io.airbyte.integrations.destination.s3;

import com.google.common.annotations.VisibleForTesting;
import java.io.OutputStream;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.Map;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.CipherOutputStream;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.KeyGenerator;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.SecretKey;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;

/**
* This class implements the envelope encryption that Redshift and Snowflake use when loading encrypted files from S3 (or other blob stores):
* <ul>
* <li>A content-encrypting-key (CEK) is used to encrypt the actual data (i.e. the CSV file)</li>
* <li>A key-encrypting-key (KEK) is used to encrypt the CEK</li>
* <li>The encrypted CEK is stored in the S3 object metadata, along with the plaintext initialization vector</li>
* <li>The COPY command includes the KEK (in plaintext). Redshift/Snowflake will use it to decrypt the CEK, which it then uses to decrypt the CSV file.</li>
* </ul>
* <p>
* A new CEK is generated for each S3 object, but each sync uses a single KEK. The KEK may be either user-provided (if the user wants to keep the data
* for further use), or generated per-sync (if they simply want to add additional security around their COPY operation).
* <p>
* Redshift does not support loading directly from GCS or Azure Blob Storage.
* <p>
* Snowflake only supports client-side encryption in S3 and Azure Storage; it does not support this feature in GCS (https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html).
* Azure Storage uses a similar envelope encryption technique to S3 (https://docs.microsoft.com/en-us/azure/storage/common/storage-client-side-encryption?tabs=dotnet#encryption-via-the-envelope-technique).
*/
public class AesCbcEnvelopeEncryptionBlobDecorator extends BlobDecorator {

public static final String ENCRYPTED_CONTENT_ENCRYPTING_KEY = "aes_cbc_envelope_encryption-content-encrypting-key";
public static final String INITIALIZATION_VECTOR = "aes_cbc_envelope_encryption-initialization-vector";

public static final int AES_KEY_SIZE_BITS = 256;
private static final int AES_CBC_INITIALIZATION_VECTOR_SIZE_BYTES = 16;
private static final Encoder BASE64_ENCODER = Base64.getEncoder();
private static final SecureRandom SECURE_RANDOM = new SecureRandom();

public static final String KEY_ENCRYPTING_ALGO = "AES";

// There's no specific KeyGenerator for AES/CBC/PKCS5Padding, so we just use a normal AES KeyGenerator
private static final String CONTENT_ENCRYPTING_KEY_ALGO = "AES";
// Redshift's UNLOAD command uses this cipher mode, so we'll use it here as well.
// TODO If we eventually want to expose client-side encryption in destination-s3, we should probably also implement
// AES-GCM, since it's mostly superior to CBC mode. (if we do that: make sure that the output is compatible with
// aws-java-sdk's AmazonS3EncryptionV2Client, which requires a slightly different set of metadata)
private static final String CONTENT_ENCRYPTING_CIPHER_ALGO = "AES/CBC/PKCS5Padding";

// The real "secret key". Should be handled with great care.
private final SecretKey keyEncryptingKey;
// A random key generated for each file. Also should be handled with care.
private final SecretKey contentEncryptingKey;
// Arbitrary bytes required by the CBC algorithm. Not a sensitive value.
// The only requirement is that we never reuse an (IV, CEK) pair.
private final byte[] initializationVector;

public AesCbcEnvelopeEncryptionBlobDecorator(final SecretKey keyEncryptingKey) {
this(keyEncryptingKey, randomContentEncryptingKey(), randomInitializationVector());
}

public AesCbcEnvelopeEncryptionBlobDecorator(final byte[] keyEncryptingKey) {
this(new SecretKeySpec(keyEncryptingKey, KEY_ENCRYPTING_ALGO));
}

@VisibleForTesting
AesCbcEnvelopeEncryptionBlobDecorator(final SecretKey keyEncryptingKey, final SecretKey contentEncryptingKey, final byte[] initializationVector) {
this.keyEncryptingKey = keyEncryptingKey;
this.contentEncryptingKey = contentEncryptingKey;

this.initializationVector = initializationVector;
}

@Override
public OutputStream wrap(final OutputStream stream) {
try {
final Cipher dataCipher = Cipher.getInstance(CONTENT_ENCRYPTING_CIPHER_ALGO);
dataCipher.init(Cipher.ENCRYPT_MODE, contentEncryptingKey, new IvParameterSpec(initializationVector));
return new CipherOutputStream(stream, dataCipher);
} catch (final InvalidAlgorithmParameterException | NoSuchPaddingException | NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}

@Override
public void updateMetadata(final Map<String, String> metadata, final Map<String, String> metadataKeyMapping) {
try {
final Cipher keyCipher = Cipher.getInstance(KEY_ENCRYPTING_ALGO);
keyCipher.init(Cipher.ENCRYPT_MODE, keyEncryptingKey);
final byte[] encryptedCekBytes = keyCipher.doFinal(contentEncryptingKey.getEncoded());

insertMetadata(metadata, metadataKeyMapping, ENCRYPTED_CONTENT_ENCRYPTING_KEY, BASE64_ENCODER.encodeToString(encryptedCekBytes));
insertMetadata(metadata, metadataKeyMapping, INITIALIZATION_VECTOR, BASE64_ENCODER.encodeToString(initializationVector));
} catch (final NoSuchPaddingException | NoSuchAlgorithmException | InvalidKeyException | IllegalBlockSizeException | BadPaddingException e) {
throw new RuntimeException(e);
}
}

private static SecretKey randomContentEncryptingKey() {
try {
final KeyGenerator cekGenerator = KeyGenerator.getInstance(CONTENT_ENCRYPTING_KEY_ALGO);
cekGenerator.init(AES_KEY_SIZE_BITS);
return cekGenerator.generateKey();
} catch (final NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}

private static byte[] randomInitializationVector() {
final byte[] initializationVector = new byte[AES_CBC_INITIALIZATION_VECTOR_SIZE_BYTES];
SECURE_RANDOM.nextBytes(initializationVector);
return initializationVector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.airbyte.integrations.destination.s3;

import com.google.common.annotations.VisibleForTesting;
import java.io.OutputStream;
import java.util.Map;

/**
* Represents the ability to modify how a blob is stored, by modifying the data being written and/or the blob's metadata.
*/
public abstract class BlobDecorator {

public abstract OutputStream wrap(OutputStream stream);

/**
* Modifies the blob's metadata.
* <p>
* In the most common case, BlobDecorator implementations will insert new entries into the metadata map. These entries may be vendor-specific. The
* metadataKeyMapping parameter defines a mapping from the "canonical" keys to the vendor-specific keys. See {@link
* S3StorageOperations#getMetadataMapping()} for an example.
* <p>
* If a key is not defined in metadataKeyMapping, it will not be inserted into the metadata.
*
* @param metadata The blob's metadata
* @param metadataKeyMapping The mapping from canonical to vendor-specific key names
*/
public abstract void updateMetadata(Map<String, String> metadata, Map<String, String> metadataKeyMapping);

/**
* A convenience method for subclasses. Handles inserting new metadata entries according to the metadataKeyMapping.
*/
@VisibleForTesting
static void insertMetadata(final Map<String, String> metadata,
final Map<String, String> metadataKeyMapping,
final String key,
final String value) {
if (metadataKeyMapping.containsKey(key)) {
metadata.put(metadataKeyMapping.get(key), value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,49 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.joda.time.DateTime;

public interface BlobStorageOperations {
public abstract class BlobStorageOperations {

String getBucketObjectPath(String namespace, String streamName, DateTime writeDatetime, String customFormat);
protected final List<BlobDecorator> blobDecorators;

public BlobStorageOperations() {
this.blobDecorators = new ArrayList<>();
}

public abstract String getBucketObjectPath(String namespace, String streamName, DateTime writeDatetime, String customFormat);

/**
* Create a storage object where to store data in the destination for a @param objectPath
*/
void createBucketObjectIfNotExists(String objectPath) throws Exception;
public abstract void createBucketObjectIfNotExists(String objectPath) throws Exception;

/**
* Upload the data files into the storage area.
*
* @return the name of the file that was uploaded.
*/
String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String streamName, String objectPath) throws Exception;
public abstract String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String streamName, String objectPath)
throws Exception;

/**
* Remove files that were just stored in the bucket
*/
void cleanUpBucketObject(String objectPath, List<String> stagedFiles) throws Exception;
public abstract void cleanUpBucketObject(String objectPath, List<String> stagedFiles) throws Exception;

public abstract void cleanUpBucketObject(String namespace, String StreamName, String objectPath, String pathFormat);

public abstract void dropBucketObject(String objectPath);

void cleanUpBucketObject(String namespace, String StreamName, String objectPath, String pathFormat);
public abstract boolean isValidData(JsonNode jsonNode);

void dropBucketObject(String objectPath);
protected abstract Map<String, String> getMetadataMapping();

boolean isValidData(JsonNode jsonNode);
public void addBlobDecorator(final BlobDecorator blobDecorator) {
blobDecorators.add(blobDecorator);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.airbyte.integrations.destination.s3;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Base64;
import java.util.Base64.Decoder;

public sealed interface EncryptionConfig permits AesCbcEnvelopeEncryption, NoEncryption {
Decoder BASE64_DECODER = Base64.getDecoder();

static EncryptionConfig fromJson(final JsonNode encryptionNode) {
// For backwards-compatibility. Preexisting configs which don't contain the "encryption" key will pass a null JsonNode into this method.
if (encryptionNode == null) {
return new NoEncryption();
}

final String encryptionType = encryptionNode.get("encryption_type").asText();
return switch (encryptionType) {
case "none" -> new NoEncryption();
case "aes_cbc_envelope" -> AesCbcEnvelopeEncryption.fromJson(encryptionNode);
default -> throw new IllegalArgumentException("Invalid encryption type: " + encryptionType);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.airbyte.integrations.destination.s3;

public final class NoEncryption implements EncryptionConfig {

}
Loading