diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java index 7654c3941ef04..7c67c7bc6bc7f 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java @@ -41,7 +41,6 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.integrations.destination.gcs.GcsS3Helper; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStream; @@ -164,7 +163,7 @@ void setup(final TestInfo info) throws IOException { final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); - this.s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + this.s3Client = gcsDestinationConfig.getS3Client(); tornDown = false; Runtime.getRuntime() diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java index 78e03ff6ef803..a5a1420f05d64 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java @@ -17,7 +17,6 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.integrations.destination.gcs.GcsS3Helper; import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteMessage; import java.util.List; @@ -46,14 +45,14 @@ public abstract class AbstractGscBigQueryUploader outputRecordCollector, AirbyteMessage lastStateMessage) throws Exception { + protected void uploadData(final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) throws Exception { LOGGER.info("Uploading data to the tmp table {}.", tmpTable.getTable()); uploadDataFromFileToTmpTable(); super.uploadData(outputRecordCollector, lastStateMessage); @@ -68,7 +67,7 @@ protected void uploadDataFromFileToTmpTable() throws Exception { LOGGER.info(String.format("Started copying data from %s GCS " + getFileTypeName() + " file to %s tmp BigQuery table with schema: \n %s", fileLocation, tmpTable, recordFormatter.getBigQuerySchema())); - LoadJobConfiguration configuration = getLoadConfiguration(); + final LoadJobConfiguration configuration = getLoadConfiguration(); // For more information on Job see: // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html @@ -96,7 +95,7 @@ private String getFileTypeName() { private void deleteGcsFiles() { LOGGER.info("Deleting file {}", writer.getFileLocation()); final GcsDestinationConfig gcsDestinationConfig = this.gcsDestinationConfig; - final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + final AmazonS3 s3Client = gcsDestinationConfig.getS3Client(); final String gcsBucketName = gcsDestinationConfig.getBucketName(); final String gcs_bucket_path = gcsDestinationConfig.getBucketPath(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java index 9083c5cdc558b..2491048fa929d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java @@ -22,7 +22,6 @@ import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.integrations.destination.gcs.GcsS3Helper; import io.airbyte.integrations.destination.gcs.avro.GcsAvroWriter; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -120,7 +119,7 @@ private static GcsAvroWriter initGcsWriter( throws IOException { final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); - final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + final AmazonS3 s3Client = gcsDestinationConfig.getS3Client(); return new GcsAvroWriter( gcsDestinationConfig, s3Client, diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java index 9c877a82da4a7..93dca8843be07 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java @@ -16,7 +16,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.integrations.destination.gcs.GcsS3Helper; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.Field; @@ -108,7 +107,7 @@ void setup(final TestInfo info) throws IOException { final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); - this.s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); + this.s3Client = gcsDestinationConfig.getS3Client(); tornDown = false; addShutdownHook(); diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java index ea41995403885..4f7df0fc366f8 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java @@ -59,24 +59,18 @@ public static DatabricksDestinationConfig get(final JsonNode config) { } public static S3DestinationConfig getDataSource(final JsonNode dataSource) { - return new S3DestinationConfig( - "", + return S3DestinationConfig.create( dataSource.get("s3_bucket_name").asText(), dataSource.get("s3_bucket_path").asText(), - dataSource.get("s3_bucket_region").asText(), - dataSource.get("s3_access_key_id").asText(), - dataSource.get("s3_secret_access_key").asText(), - getDefaultParquetConfig()); + dataSource.get("s3_bucket_region").asText()) + .withFormatConfig(new S3ParquetFormatConfig(new ObjectMapper().createObjectNode())) + .get(); } public String getDatabricksServerHostname() { return databricksServerHostname; } - private static S3ParquetFormatConfig getDefaultParquetConfig() { - return new S3ParquetFormatConfig(new ObjectMapper().createObjectNode()); - } - public String getDatabricksHttpPath() { return databricksHttpPath; } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 3ad79c3ef468e..5538f91bc76dd 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -206,15 +206,10 @@ public String getCurrentFile() { * creates an {@link S3DestinationConfig} whose bucket path is /. */ static S3DestinationConfig getStagingS3DestinationConfig(final S3DestinationConfig config, final String stagingFolder) { - return new S3DestinationConfig( - config.getEndpoint(), - config.getBucketName(), - String.join("/", config.getBucketPath(), stagingFolder), - config.getBucketRegion(), - config.getAccessKeyId(), - config.getSecretAccessKey(), - // use default parquet format config - new S3ParquetFormatConfig(MAPPER.createObjectNode())); + return S3DestinationConfig.create(config) + .withBucketPath(String.join("/", config.getBucketPath(), stagingFolder)) + .withFormatConfig(new S3ParquetFormatConfig(MAPPER.createObjectNode())) + .get(); } } diff --git a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierTest.java b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierTest.java index 0a6c99e61e83c..efe18299c2025 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierTest.java @@ -15,7 +15,7 @@ class DatabricksStreamCopierTest { @Test public void testGetStagingS3DestinationConfig() { final String bucketPath = UUID.randomUUID().toString(); - final S3DestinationConfig config = new S3DestinationConfig("", "", bucketPath, "", "", "", null); + final S3DestinationConfig config = S3DestinationConfig.create("", bucketPath, "").get(); final String stagingFolder = UUID.randomUUID().toString(); final S3DestinationConfig stagingConfig = DatabricksStreamCopier.getStagingS3DestinationConfig(config, stagingFolder); assertEquals(String.format("%s/%s", bucketPath, stagingFolder), stagingConfig.getBucketPath()); diff --git a/airbyte-integrations/connectors/destination-gcs/Dockerfile b/airbyte-integrations/connectors/destination-gcs/Dockerfile index 6e7207acf2726..db68d4070cb3c 100644 --- a/airbyte-integrations/connectors/destination-gcs/Dockerfile +++ b/airbyte-integrations/connectors/destination-gcs/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.24 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/destination-gcs diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java deleted file mode 100644 index 8149228b5574c..0000000000000 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.gcs; - -import com.amazonaws.services.s3.AmazonS3; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; -import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; -import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; -import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer { - - protected static final Logger LOGGER = LoggerFactory.getLogger(GcsConsumer.class); - - private final GcsDestinationConfig gcsDestinationConfig; - private final ConfiguredAirbyteCatalog configuredCatalog; - private final GcsWriterFactory writerFactory; - private final Consumer outputRecordCollector; - private final Map streamNameAndNamespaceToWriters; - - private AirbyteMessage lastStateMessage = null; - - public GcsConsumer(final GcsDestinationConfig gcsDestinationConfig, - final ConfiguredAirbyteCatalog configuredCatalog, - final GcsWriterFactory writerFactory, - final Consumer outputRecordCollector) { - this.gcsDestinationConfig = gcsDestinationConfig; - this.configuredCatalog = configuredCatalog; - this.writerFactory = writerFactory; - this.outputRecordCollector = outputRecordCollector; - this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); - } - - @Override - protected void startTracked() throws Exception { - final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); - - final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); - - for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { - final DestinationFileWriter writer = writerFactory - .create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp); - writer.initialize(); - - final AirbyteStream stream = configuredStream.getStream(); - final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair - .fromAirbyteSteam(stream); - streamNameAndNamespaceToWriters.put(streamNamePair, writer); - } - } - - @Override - protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception { - if (airbyteMessage.getType() == Type.STATE) { - this.lastStateMessage = airbyteMessage; - return; - } else if (airbyteMessage.getType() != Type.RECORD) { - return; - } - - final AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); - final AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair - .fromRecordMessage(recordMessage); - - if (!streamNameAndNamespaceToWriters.containsKey(pair)) { - throw new IllegalArgumentException( - String.format( - "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", - Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); - } - - final UUID id = UUID.randomUUID(); - streamNameAndNamespaceToWriters.get(pair).write(id, recordMessage); - } - - @Override - protected void close(final boolean hasFailed) throws Exception { - LOGGER.debug("Closing consumer with writers = {}", streamNameAndNamespaceToWriters); - List exceptionsThrown = new ArrayList<>(); - for (var entry : streamNameAndNamespaceToWriters.entrySet()) { - final DestinationFileWriter handler = entry.getValue(); - LOGGER.debug("Closing writer {}", entry.getKey()); - try { - handler.close(hasFailed); - } catch (Exception e) { - exceptionsThrown.add(e); - LOGGER.error("Exception while closing writer {}", entry.getKey(), e); - } - } - if (!exceptionsThrown.isEmpty()) { - throw new RuntimeException(String.format("Exceptions thrown while closing consumer: %s", Strings.join(exceptionsThrown, "\n"))); - } - // Gcs stream uploader is all or nothing if a failure happens in the destination. - if (!hasFailed) { - outputRecordCollector.accept(lastStateMessage); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java index 5310209e1c93b..5c3685ac2243b 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java @@ -10,9 +10,11 @@ import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; -import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; -import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory; +import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.record_buffer.FileBuffer; +import io.airbyte.integrations.destination.s3.S3ConsumerFactory; import io.airbyte.integrations.destination.s3.S3Destination; +import io.airbyte.integrations.destination.s3.SerializedBufferFactory; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; @@ -27,6 +29,12 @@ public class GcsDestination extends BaseConnector implements Destination { public static final String EXPECTED_ROLES = "storage.multipartUploads.abort, storage.multipartUploads.create, " + "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list"; + private final NamingConventionTransformer nameTransformer; + + public GcsDestination() { + this.nameTransformer = new GcsNameTransformer(); + } + public static void main(final String[] args) throws Exception { new IntegrationRunner(new GcsDestination()).run(args); } @@ -35,7 +43,7 @@ public static void main(final String[] args) throws Exception { public AirbyteConnectionStatus check(final JsonNode config) { try { final GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config); - final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig); + final AmazonS3 s3Client = destinationConfig.getS3Client(); // Test single upload (for small files) permissions S3Destination.testSingleUpload(s3Client, destinationConfig.getBucketName()); @@ -59,9 +67,14 @@ public AirbyteConnectionStatus check(final JsonNode config) { public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog configuredCatalog, final Consumer outputRecordCollector) { - final GcsWriterFactory formatterFactory = new ProductionWriterFactory(); - return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog, - formatterFactory, outputRecordCollector); + final GcsDestinationConfig gcsConfig = GcsDestinationConfig.getGcsDestinationConfig(config); + return new S3ConsumerFactory().create( + outputRecordCollector, + new GcsStorageOperations(nameTransformer, gcsConfig.getS3Client(), gcsConfig), + nameTransformer, + SerializedBufferFactory.getCreateFunction(gcsConfig, FileBuffer::new), + gcsConfig, + configuredCatalog); } } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java index 6aa8b423bfc9e..935c7485315b9 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfig.java @@ -4,30 +4,46 @@ package io.airbyte.integrations.destination.gcs; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.destination.gcs.credential.GcsCredentialConfig; import io.airbyte.integrations.destination.gcs.credential.GcsCredentialConfigs; +import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.S3FormatConfigs; -public class GcsDestinationConfig { +/** + * Currently we always reuse the S3 client for GCS. So the GCS config extends from the S3 config. + * This may change in the future. + */ +public class GcsDestinationConfig extends S3DestinationConfig { + + private static final String GCS_ENDPOINT = "https://storage.googleapis.com"; - private final String bucketName; - private final String bucketPath; - private final String bucketRegion; private final GcsCredentialConfig credentialConfig; - private final S3FormatConfig formatConfig; public GcsDestinationConfig(final String bucketName, final String bucketPath, final String bucketRegion, final GcsCredentialConfig credentialConfig, final S3FormatConfig formatConfig) { - this.bucketName = bucketName; - this.bucketPath = bucketPath; - this.bucketRegion = bucketRegion; + super(GCS_ENDPOINT, + bucketName, + bucketPath, + bucketRegion, + S3DestinationConstants.DEFAULT_PATH_FORMAT, + credentialConfig.getS3CredentialConfig().orElseThrow(), + S3DestinationConstants.DEFAULT_PART_SIZE_MB, + formatConfig, + null); + this.credentialConfig = credentialConfig; - this.formatConfig = formatConfig; } public static GcsDestinationConfig getGcsDestinationConfig(final JsonNode config) { @@ -39,24 +55,25 @@ public static GcsDestinationConfig getGcsDestinationConfig(final JsonNode config S3FormatConfigs.getS3FormatConfig(config)); } - public String getBucketName() { - return bucketName; - } - - public String getBucketPath() { - return bucketPath; - } + @Override + protected AmazonS3 createS3Client() { + switch (credentialConfig.getCredentialType()) { + case HMAC_KEY -> { + final GcsHmacKeyCredentialConfig hmacKeyCredential = (GcsHmacKeyCredentialConfig) credentialConfig; + final BasicAWSCredentials awsCreds = new BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret()); - public String getBucketRegion() { - return bucketRegion; + return AmazonS3ClientBuilder.standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, getBucketRegion())) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + } + default -> throw new IllegalArgumentException("Unsupported credential type: " + credentialConfig.getCredentialType().name()); + } } - public GcsCredentialConfig getCredentialConfig() { + public GcsCredentialConfig getGcsCredentialConfig() { return credentialConfig; } - public S3FormatConfig getFormatConfig() { - return formatConfig; - } - } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsNameTransformer.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsNameTransformer.java new file mode 100644 index 0000000000000..83361eac22f73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsNameTransformer.java @@ -0,0 +1,7 @@ +package io.airbyte.integrations.destination.gcs; + +import io.airbyte.integrations.destination.s3.util.S3NameTransformer; + +public class GcsNameTransformer extends S3NameTransformer { + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsS3Helper.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsS3Helper.java deleted file mode 100644 index 880fc54c128d2..0000000000000 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsS3Helper.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.gcs; - -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig; - -public class GcsS3Helper { - - private static final String GCS_ENDPOINT = "https://storage.googleapis.com"; - - public static AmazonS3 getGcsS3Client(final GcsDestinationConfig gcsDestinationConfig) { - final GcsHmacKeyCredentialConfig hmacKeyCredential = (GcsHmacKeyCredentialConfig) gcsDestinationConfig.getCredentialConfig(); - final BasicAWSCredentials awsCreds = new BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret()); - - return AmazonS3ClientBuilder.standard() - .withEndpointConfiguration( - new AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, gcsDestinationConfig.getBucketRegion())) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .build(); - } - -} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java new file mode 100644 index 0000000000000..fab3e0ee5e803 --- /dev/null +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.gcs; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.ObjectListing; +import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.S3StorageOperations; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GcsStorageOperations extends S3StorageOperations { + + private static final Logger LOGGER = LoggerFactory.getLogger(GcsStorageOperations.class); + + public GcsStorageOperations(final NamingConventionTransformer nameTransformer, + final AmazonS3 s3Client, + final S3DestinationConfig s3Config) { + super(nameTransformer, s3Client, s3Config); + } + + /** + * This method is overridden because GCS doesn't accept request to delete multiple objects. The + * only difference is that the AmazonS3#deleteObjects method is replaced with + * AmazonS3#deleteObject. + */ + @Override + public void cleanUpBucketObject(final String objectPath, final List stagedFiles) { + final String bucket = s3Config.getBucketName(); + ObjectListing objects = s3Client.listObjects(bucket, objectPath); + while (objects.getObjectSummaries().size() > 0) { + final List keysToDelete = objects.getObjectSummaries() + .stream() + .map(obj -> new KeyVersion(obj.getKey())) + .filter(obj -> stagedFiles.isEmpty() || stagedFiles.contains(obj.getKey())) + .toList(); + for (final KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(bucket, keyToDelete.getKey()); + } + LOGGER.info("Storage bucket {} has been cleaned-up ({} objects were deleted)...", objectPath, keysToDelete.size()); + if (objects.isTruncated()) { + objects = s3Client.listNextBatchOfObjects(objects); + } else { + break; + } + } + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfig.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfig.java index 77bd2f9917a43..16fe2ac58e797 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfig.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfig.java @@ -4,8 +4,12 @@ package io.airbyte.integrations.destination.gcs.credential; -public interface GcsCredentialConfig { +import io.airbyte.integrations.destination.s3.credential.BlobStorageCredentialConfig; +import io.airbyte.integrations.destination.s3.credential.S3CredentialConfig; +import java.util.Optional; - GcsCredential getCredentialType(); +public interface GcsCredentialConfig extends BlobStorageCredentialConfig { + + Optional getS3CredentialConfig(); } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfigs.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfigs.java index c1b394c3f2766..b9166c16cce47 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfigs.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialConfigs.java @@ -11,9 +11,9 @@ public class GcsCredentialConfigs { public static GcsCredentialConfig getCredentialConfig(final JsonNode config) { final JsonNode credentialConfig = config.get("credential"); - final GcsCredential credentialType = GcsCredential.valueOf(credentialConfig.get("credential_type").asText().toUpperCase()); + final GcsCredentialType credentialType = GcsCredentialType.valueOf(credentialConfig.get("credential_type").asText().toUpperCase()); - if (credentialType == GcsCredential.HMAC_KEY) { + if (credentialType == GcsCredentialType.HMAC_KEY) { return new GcsHmacKeyCredentialConfig(credentialConfig); } throw new RuntimeException("Unexpected credential: " + Jsons.serialize(credentialConfig)); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredential.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialType.java similarity index 81% rename from airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredential.java rename to airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialType.java index 5a1e213943f5e..8746139ac13af 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredential.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsCredentialType.java @@ -4,6 +4,6 @@ package io.airbyte.integrations.destination.gcs.credential; -public enum GcsCredential { +public enum GcsCredentialType { HMAC_KEY } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.java index 8ec6317989507..445da428b87cd 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/credential/GcsHmacKeyCredentialConfig.java @@ -5,6 +5,9 @@ package io.airbyte.integrations.destination.gcs.credential; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; +import io.airbyte.integrations.destination.s3.credential.S3CredentialConfig; +import java.util.Optional; public class GcsHmacKeyCredentialConfig implements GcsCredentialConfig { @@ -16,6 +19,11 @@ public GcsHmacKeyCredentialConfig(final JsonNode credentialConfig) { this.hmacKeySecret = credentialConfig.get("hmac_key_secret").asText(); } + public GcsHmacKeyCredentialConfig(final String hmacKeyAccessId, final String hmacKeySecret) { + this.hmacKeyAccessId = hmacKeyAccessId; + this.hmacKeySecret = hmacKeySecret; + } + public String getHmacKeyAccessId() { return hmacKeyAccessId; } @@ -25,8 +33,13 @@ public String getHmacKeySecret() { } @Override - public GcsCredential getCredentialType() { - return GcsCredential.HMAC_KEY; + public GcsCredentialType getCredentialType() { + return GcsCredentialType.HMAC_KEY; + } + + @Override + public Optional getS3CredentialConfig() { + return Optional.of(new S3AccessKeyCredentialConfig(hmacKeyAccessId, hmacKeySecret)); } } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java index 501db7871d54e..e37bbfb0deb9d 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java @@ -77,7 +77,7 @@ public GcsParquetWriter(final GcsDestinationConfig config, } public static Configuration getHadoopConfig(final GcsDestinationConfig config) { - final GcsHmacKeyCredentialConfig hmacKeyCredential = (GcsHmacKeyCredentialConfig) config.getCredentialConfig(); + final GcsHmacKeyCredentialConfig hmacKeyCredential = (GcsHmacKeyCredentialConfig) config.getGcsCredentialConfig(); final Configuration hadoopConfig = new Configuration(); // the default org.apache.hadoop.fs.s3a.S3AFileSystem does not work for GCS @@ -99,7 +99,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage) throw } @Override - public void write(JsonNode formattedData) throws IOException { + public void write(final JsonNode formattedData) throws IOException { parquetWriter.write(avroRecordFactory.getAvroRecord(formattedData)); } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java deleted file mode 100644 index 9f993070e3b91..0000000000000 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.gcs.writer; - -import com.amazonaws.services.s3.AmazonS3; -import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import java.sql.Timestamp; - -/** - * Create different {@link GcsWriterFactory} based on {@link GcsDestinationConfig}. - */ -public interface GcsWriterFactory { - - DestinationFileWriter create(GcsDestinationConfig config, - AmazonS3 s3Client, - ConfiguredAirbyteStream configuredStream, - Timestamp uploadTimestamp) - throws Exception; - -} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java deleted file mode 100644 index 7aa0f8d5777b5..0000000000000 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.gcs.writer; - -import com.amazonaws.services.s3.AmazonS3; -import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.integrations.destination.gcs.avro.GcsAvroWriter; -import io.airbyte.integrations.destination.gcs.csv.GcsCsvWriter; -import io.airbyte.integrations.destination.gcs.jsonl.GcsJsonlWriter; -import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter; -import io.airbyte.integrations.destination.s3.S3Format; -import io.airbyte.integrations.destination.s3.avro.AvroConstants; -import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; -import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import java.sql.Timestamp; -import org.apache.avro.Schema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ProductionWriterFactory implements GcsWriterFactory { - - protected static final Logger LOGGER = LoggerFactory.getLogger(ProductionWriterFactory.class); - - @Override - public DestinationFileWriter create(final GcsDestinationConfig config, - final AmazonS3 s3Client, - final ConfiguredAirbyteStream configuredStream, - final Timestamp uploadTimestamp) - throws Exception { - final S3Format format = config.getFormatConfig().getFormat(); - - if (format == S3Format.AVRO || format == S3Format.PARQUET) { - final AirbyteStream stream = configuredStream.getStream(); - LOGGER.info("Json schema for stream {}: {}", stream.getName(), stream.getJsonSchema()); - - if (format == S3Format.AVRO) { - return new GcsAvroWriter(config, s3Client, configuredStream, uploadTimestamp, AvroConstants.JSON_CONVERTER, stream.getJsonSchema()); - } else { - final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter(); - final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace()); - - LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false)); - return new GcsParquetWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, AvroConstants.JSON_CONVERTER); - } - } - - if (format == S3Format.CSV) { - return new GcsCsvWriter(config, s3Client, configuredStream, uploadTimestamp); - } - - if (format == S3Format.JSONL) { - return new GcsJsonlWriter(config, s3Client, configuredStream, uploadTimestamp); - } - - throw new RuntimeException("Unexpected GCS destination format: " + format); - } - -} diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java index c3439e954b92e..388bc68ebdb36 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java @@ -11,6 +11,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.s3.S3Format; +import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; @@ -22,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.stream.StreamSupport; +import java.util.zip.GZIPInputStream; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import org.apache.commons.csv.QuoteMode; @@ -34,10 +36,9 @@ public GcsCsvDestinationAcceptanceTest() { @Override protected JsonNode getFormatConfig() { - return Jsons.deserialize("{\n" - + " \"format_type\": \"CSV\",\n" - + " \"flattening\": \"Root level flattening\"\n" - + "}"); + return Jsons.jsonNode(Map.of( + "format_type", outputFormat, + "flattening", Flattening.ROOT_LEVEL.getValue())); } /** @@ -94,8 +95,8 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, final List jsonRecords = new LinkedList<>(); for (final S3ObjectSummary objectSummary : objectSummaries) { - final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); - try (final Reader in = new InputStreamReader(object.getObjectContent(), StandardCharsets.UTF_8)) { + try (final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); + final Reader in = new InputStreamReader(new GZIPInputStream(object.getObjectContent()), StandardCharsets.UTF_8)) { final Iterable records = CSVFormat.DEFAULT .withQuoteMode(QuoteMode.NON_NUMERIC) .withFirstRecordAsHeader() diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java index 097eb97a2c566..5af9dfd110cd9 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java @@ -16,10 +16,11 @@ import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.S3FormatConfig; -import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper; +import io.airbyte.integrations.destination.s3.S3StorageOperations; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.nio.file.Path; import java.util.Comparator; @@ -28,6 +29,8 @@ import java.util.Locale; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +57,8 @@ public abstract class GcsDestinationAcceptanceTest extends DestinationAcceptance protected JsonNode configJson; protected GcsDestinationConfig config; protected AmazonS3 s3Client; + protected NamingConventionTransformer nameTransformer; + protected S3StorageOperations s3StorageOperations; protected GcsDestinationAcceptanceTest(final S3Format outputFormat) { this.outputFormat = outputFormat; @@ -73,6 +78,14 @@ protected JsonNode getConfig() { return configJson; } + @Override + protected String getDefaultSchema(final JsonNode config) { + if (config.has("gcs_bucket_path")) { + return config.get("gcs_bucket_path").asText(); + } + return null; + } + @Override protected JsonNode getFailCheckConfig() { final JsonNode baseJson = getBaseConfigJson(); @@ -87,13 +100,20 @@ protected JsonNode getFailCheckConfig() { * Helper method to retrieve all synced objects inside the configured bucket path. */ protected List getAllSyncedObjects(final String streamName, final String namespace) { - final String outputPrefix = S3OutputPathHelper - .getOutputPrefix(config.getBucketPath(), namespace, streamName); + final String namespaceStr = nameTransformer.getNamespace(namespace); + final String streamNameStr = nameTransformer.getIdentifier(streamName); + final String outputPrefix = s3StorageOperations.getBucketObjectPath( + namespaceStr, + streamNameStr, + DateTime.now(DateTimeZone.UTC), + config.getPathFormat()); + // the child folder contains a non-deterministic epoch timestamp, so use the parent folder + final String parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1); final List objectSummaries = s3Client - .listObjects(config.getBucketName(), outputPrefix) + .listObjects(config.getBucketName(), parentFolder) .getObjectSummaries() .stream() - .filter(o -> o.getKey().contains(S3DestinationConstants.NAME_TRANSFORMER.convertStreamName(streamName) + "/")) + .filter(o -> o.getKey().contains(streamNameStr + "/")) .sorted(Comparator.comparingLong(o -> o.getLastModified().getTime())) .collect(Collectors.toList()); LOGGER.info( @@ -125,7 +145,9 @@ protected void setup(final TestDestinationEnv testEnv) { this.config = GcsDestinationConfig.getGcsDestinationConfig(configJson); LOGGER.info("Test full path: {}/{}", config.getBucketName(), config.getBucketPath()); - this.s3Client = GcsS3Helper.getGcsS3Client(config); + this.s3Client = config.getS3Client(); + this.nameTransformer = new GcsNameTransformer(); + this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, config); } /** diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java index a46058c875855..ba8fec3953420 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsJsonlDestinationAcceptanceTest.java @@ -16,6 +16,8 @@ import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.zip.GZIPInputStream; public class GcsJsonlDestinationAcceptanceTest extends GcsDestinationAcceptanceTest { @@ -25,9 +27,7 @@ protected GcsJsonlDestinationAcceptanceTest() { @Override protected JsonNode getFormatConfig() { - return Jsons.deserialize("{\n" - + " \"format_type\": \"JSONL\"\n" - + "}"); + return Jsons.jsonNode(Map.of("format_type", outputFormat)); } @Override @@ -41,7 +41,8 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, for (final S3ObjectSummary objectSummary : objectSummaries) { final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent(), StandardCharsets.UTF_8))) { + try (final BufferedReader reader = + new BufferedReader(new InputStreamReader(new GZIPInputStream(object.getObjectContent()), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { jsonRecords.add(Jsons.deserialize(line).get(JavaBaseConstants.COLUMN_NAME_DATA)); diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java index 6db884528f298..561de516aab11 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java @@ -19,6 +19,7 @@ import java.net.URISyntaxException; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroReadSupport; @@ -32,10 +33,9 @@ protected GcsParquetDestinationAcceptanceTest() { @Override protected JsonNode getFormatConfig() { - return Jsons.deserialize("{\n" - + " \"format_type\": \"Parquet\",\n" - + " \"compression_codec\": \"GZIP\"\n" - + "}"); + return Jsons.jsonNode(Map.of( + "format_type", "Parquet", + "compression_codec", "GZIP")); } @Override diff --git a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfigTest.java b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfigTest.java index 7b4c7db20ebb0..2dbcc466cbd79 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfigTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/GcsDestinationConfigTest.java @@ -28,7 +28,7 @@ public void testGetGcsDestinationConfig() throws IOException { assertEquals("test_path", config.getBucketPath()); assertEquals("us-west1", config.getBucketRegion()); - final GcsCredentialConfig credentialConfig = config.getCredentialConfig(); + final GcsCredentialConfig credentialConfig = config.getGcsCredentialConfig(); assertTrue(credentialConfig instanceof GcsHmacKeyCredentialConfig); final GcsHmacKeyCredentialConfig hmacKeyConfig = (GcsHmacKeyCredentialConfig) credentialConfig; diff --git a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriterTest.java b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriterTest.java index 47d763a04cae8..1b6b605d57197 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriterTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriterTest.java @@ -11,6 +11,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.fasterxml.jackson.databind.ObjectMapper; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig; import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -28,7 +29,7 @@ public void generatesCorrectObjectPath() throws IOException { "fake-bucket", "fake-bucketPath", "fake-bucketRegion", - null, + new GcsHmacKeyCredentialConfig("fake-access-id", "fake-secret"), new S3AvroFormatConfig(new ObjectMapper().createObjectNode())), mock(AmazonS3.class, RETURNS_DEEP_STUBS), new ConfiguredAirbyteStream() diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index ded91cd049300..9765437b1e9d5 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -11,6 +11,7 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.S3FormatConfig; import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig; import io.airbyte.integrations.destination.s3.csv.S3CsvWriter; import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; @@ -95,10 +96,12 @@ public String prepareStagingFile() { LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); try { + // The Flattening value is actually ignored, because we pass an explicit CsvSheetGenerator. So just + // pass in null. + final S3FormatConfig csvFormatConfig = new S3CsvFormatConfig(null, (long) s3Config.getPartSize()); + final S3DestinationConfig writerS3Config = S3DestinationConfig.create(s3Config).withFormatConfig(csvFormatConfig).get(); final S3CsvWriter writer = new S3CsvWriter.Builder( - // The Flattening value is actually ignored, because we pass an explicit CsvSheetGenerator. So just - // pass in null. - s3Config.cloneWithFormatConfig(new S3CsvFormatConfig(null, (long) s3Config.getPartSize())), + writerS3Config, s3Client, configuredAirbyteStream, uploadTime) @@ -129,8 +132,8 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage, final @Override public void closeNonCurrentStagingFileWriters() throws Exception { - Set removedKeys = new HashSet<>(); - for (String key : activeStagingWriterFileNames) { + final Set removedKeys = new HashSet<>(); + for (final String key : activeStagingWriterFileNames) { if (!key.equals(currentFile)) { stagingWritersByFile.get(key).close(false); stagingWritersByFile.remove(key); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java index 2ddab245b7de7..737710cb44912 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java @@ -43,15 +43,14 @@ public class S3StreamCopierTest { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamCopierTest.class); private static final int PART_SIZE = 5; - private static final S3DestinationConfig S3_CONFIG = new S3DestinationConfig( - "fake-endpoint", + private static final S3DestinationConfig S3_CONFIG = S3DestinationConfig.create( "fake-bucket", "fake-bucketPath", - "fake-region", - "fake-access-key-id", - "fake-secret-access-key", - PART_SIZE, - null); + "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") + .withPartSize(PART_SIZE) + .get(); private static final ConfiguredAirbyteStream CONFIGURED_STREAM = new ConfiguredAirbyteStream() .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(new AirbyteStream() @@ -177,7 +176,7 @@ public void createSequentialStagingFiles_when_multipleFilesRequested() { } private void checkCsvWriterArgs(final S3CsvWriterArguments args) { - assertEquals(S3_CONFIG.cloneWithFormatConfig(new S3CsvFormatConfig(null, (long) PART_SIZE)), args.config); + assertEquals(S3DestinationConfig.create(S3_CONFIG).withFormatConfig(new S3CsvFormatConfig(null, (long) PART_SIZE)).get(), args.config); assertEquals(CONFIGURED_STREAM, args.stream); assertEquals(UPLOAD_TIME, args.uploadTime); assertEquals(UPLOAD_THREADS, args.uploadThreads); diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java index 8e65ceda64498..bd5d7c39067d0 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java @@ -16,6 +16,7 @@ import io.airbyte.integrations.destination.redshift.manifest.Entry; import io.airbyte.integrations.destination.redshift.manifest.Manifest; import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.sql.Timestamp; import java.time.Instant; @@ -150,6 +151,7 @@ private String putManifest(final String manifestContents) { * @param manifestPath the path in S3 to the manifest file */ private void executeCopy(final String manifestPath) { + final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig(); final var copyQuery = String.format( "COPY %s.%s FROM '%s'\n" + "CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'\n" @@ -159,8 +161,8 @@ private void executeCopy(final String manifestPath) { schemaName, tmpTableName, getFullS3Path(s3Config.getBucketName(), manifestPath), - s3Config.getAccessKeyId(), - s3Config.getSecretAccessKey(), + credentialConfig.getAccessKeyId(), + credentialConfig.getSecretAccessKey(), s3Config.getBucketRegion()); Exceptions.toRuntime(() -> db.execute(copyQuery)); diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java index 3f2b44536be6d..8026a0e339f19 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java @@ -65,23 +65,22 @@ public void setup() { db = mock(JdbcDatabase.class); sqlOperations = mock(SqlOperations.class); + final S3DestinationConfig s3Config = S3DestinationConfig.create( + "fake-bucket", + "fake-bucketPath", + "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") + .withPartSize(PART_SIZE) + .get(); + copier = new RedshiftStreamCopier( // In reality, this is normally a UUID - see CopyConsumerFactory#createWriteConfigs "fake-staging-folder", "fake-schema", s3Client, db, - new S3CopyConfig( - true, - new S3DestinationConfig( - "fake-endpoint", - "fake-bucket", - "fake-bucketPath", - "fake-region", - "fake-access-key-id", - "fake-secret-access-key", - PART_SIZE, - null)), + new S3CopyConfig(true, s3Config), new ExtendedNameTransformer(), sqlOperations, UPLOAD_TIME, diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java index 4cc0f693866e5..a035a84155215 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.s3; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import io.airbyte.commons.functional.CheckedBiConsumer; import io.airbyte.commons.functional.CheckedBiFunction; @@ -39,16 +38,14 @@ public class S3ConsumerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(S3ConsumerFactory.class); private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC); - private static final String PATH_FORMAT_FIELD = "s3_path_format"; - private static final String BUCKET_PATH_FIELD = "s3_bucket_path"; public AirbyteMessageConsumer create(final Consumer outputRecordCollector, final BlobStorageOperations storageOperations, final NamingConventionTransformer namingResolver, final CheckedBiFunction onCreateBuffer, - final JsonNode config, + final S3DestinationConfig s3Config, final ConfiguredAirbyteCatalog catalog) { - final List writeConfigs = createWriteConfigs(storageOperations, namingResolver, config, catalog); + final List writeConfigs = createWriteConfigs(storageOperations, namingResolver, s3Config, catalog); return new BufferedStreamConsumer( outputRecordCollector, onStartFunction(storageOperations, writeConfigs), @@ -63,7 +60,7 @@ public AirbyteMessageConsumer create(final Consumer outputRecord private static List createWriteConfigs(final BlobStorageOperations storageOperations, final NamingConventionTransformer namingResolver, - final JsonNode config, + final S3DestinationConfig config, final ConfiguredAirbyteCatalog catalog) { return catalog.getStreams() .stream() @@ -74,20 +71,17 @@ private static List createWriteConfigs(final BlobStorageOperations private static Function toWriteConfig( final BlobStorageOperations storageOperations, final NamingConventionTransformer namingResolver, - final JsonNode config) { + final S3DestinationConfig s3Config) { return stream -> { Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode"); final AirbyteStream abStream = stream.getStream(); final String namespace = abStream.getNamespace(); final String streamName = abStream.getName(); - final String outputBucketPath = config.get(BUCKET_PATH_FIELD).asText(); - final String customOutputFormat = String.join("/", - outputBucketPath, - config.has(PATH_FORMAT_FIELD) && !config.get(PATH_FORMAT_FIELD).asText().isBlank() ? config.get(PATH_FORMAT_FIELD).asText() - : S3DestinationConstants.DEFAULT_PATH_FORMAT); + final String bucketPath = s3Config.getBucketPath(); + final String customOutputFormat = String.join("/", bucketPath, s3Config.getPathFormat()); final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat); final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); - final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, fullOutputPath, syncMode); + final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, fullOutputPath, syncMode); LOGGER.info("Write config: {}", writeConfig); return writeConfig; }; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java index 4a7c493ed1401..2953da5f9c5db 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java @@ -36,7 +36,7 @@ public class S3Destination extends BaseConnector implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(S3Destination.class); private final S3DestinationConfigFactory configFactory; - private final NamingConventionTransformer nameTranformer; + private final NamingConventionTransformer nameTransformer; public S3Destination() { this(new S3DestinationConfigFactory()); @@ -44,7 +44,7 @@ public S3Destination() { public S3Destination(final S3DestinationConfigFactory configFactory) { this.configFactory = configFactory; - this.nameTranformer = new S3NameTransformer(); + this.nameTransformer = new S3NameTransformer(); } public static void main(final String[] args) throws Exception { @@ -56,7 +56,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { try { final S3DestinationConfig destinationConfig = configFactory.getS3DestinationConfig(config); final AmazonS3 s3Client = destinationConfig.getS3Client(); - final S3StorageOperations storageOperations = new S3StorageOperations(nameTranformer, s3Client, destinationConfig); + final S3StorageOperations storageOperations = new S3StorageOperations(nameTransformer, s3Client, destinationConfig); // Test for writing, list and delete S3Destination.attemptS3WriteAndDelete(storageOperations, destinationConfig, destinationConfig.getBucketName()); @@ -164,10 +164,10 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final S3DestinationConfig s3Config = S3DestinationConfig.getS3DestinationConfig(config); return new S3ConsumerFactory().create( outputRecordCollector, - new S3StorageOperations(nameTranformer, s3Config.getS3Client(), s3Config), - nameTranformer, - SerializedBufferFactory.getCreateFunction(config, FileBuffer::new), - config, + new S3StorageOperations(nameTransformer, s3Config.getS3Client(), s3Config), + nameTransformer, + SerializedBufferFactory.getCreateFunction(s3Config, FileBuffer::new), + s3Config, catalog); } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index 6c9379f0964e8..a2f022e34ee85 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -5,14 +5,15 @@ package io.airbyte.integrations.destination.s3; import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; +import io.airbyte.integrations.destination.s3.credential.S3CredentialConfig; +import io.airbyte.integrations.destination.s3.credential.S3CredentialType; +import io.airbyte.integrations.destination.s3.credential.S3InstanceProfileCredentialConfig; import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,43 +27,24 @@ public class S3DestinationConfig { private static final Logger LOGGER = LoggerFactory.getLogger(S3DestinationConfig.class); - // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives - // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. - // WARNING: Too large a part size can cause potential OOM errors. - public static final int DEFAULT_PART_SIZE_MB = 10; - private final String endpoint; private final String bucketName; private final String bucketPath; private final String bucketRegion; - private final String accessKeyId; - private final String secretAccessKey; + private final String pathFormat; + private final S3CredentialConfig credentialConfig; private final Integer partSize; private final S3FormatConfig formatConfig; private final Object lock = new Object(); - private AmazonS3 s3Client = null; + private AmazonS3 s3Client; - /** - * The part size should not matter in any use case that depends on this constructor. So the default - * 10 MB is used. - */ public S3DestinationConfig(final String endpoint, final String bucketName, final String bucketPath, final String bucketRegion, - final String accessKeyId, - final String secretAccessKey, - final S3FormatConfig formatConfig) { - this(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, DEFAULT_PART_SIZE_MB, formatConfig); - } - - public S3DestinationConfig(final String endpoint, - final String bucketName, - final String bucketPath, - final String bucketRegion, - final String accessKeyId, - final String secretAccessKey, + final String pathFormat, + final S3CredentialConfig credentialConfig, final Integer partSize, final S3FormatConfig formatConfig, final AmazonS3 s3Client) { @@ -70,48 +52,62 @@ public S3DestinationConfig(final String endpoint, this.bucketName = bucketName; this.bucketPath = bucketPath; this.bucketRegion = bucketRegion; - this.accessKeyId = accessKeyId; - this.secretAccessKey = secretAccessKey; + this.pathFormat = pathFormat; + this.credentialConfig = credentialConfig; this.formatConfig = formatConfig; this.partSize = partSize; this.s3Client = s3Client; } - public S3DestinationConfig(final String endpoint, - final String bucketName, - final String bucketPath, - final String bucketRegion, - final String accessKeyId, - final String secretAccessKey, - final Integer partSize, - final S3FormatConfig formatConfig) { - this(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, partSize, formatConfig, null); + public static Builder create(final String bucketName, final String bucketPath, final String bucketRegion) { + return new Builder(bucketName, bucketPath, bucketRegion); + } + + public static Builder create(final S3DestinationConfig config) { + return new Builder(config.getBucketName(), config.getBucketPath(), config.getBucketRegion()) + .withEndpoint(config.getEndpoint()) + .withCredentialConfig(config.getS3CredentialConfig()) + .withPartSize(config.getPartSize()) + .withFormatConfig(config.getFormatConfig()); } public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) { - var partSize = DEFAULT_PART_SIZE_MB; - if (config.get("part_size") != null) { - partSize = config.get("part_size").asInt(); + Builder builder = S3DestinationConfig.create( + config.get("s3_bucket_name").asText(), + "", + config.get("s3_bucket_region").asText()); + + if (config.has("s3_bucket_path")) { + builder = builder.withBucketPath(config.get("s3_bucket_path").asText()); } - String bucketPath = null; - if (config.get("s3_bucket_path") != null) { - bucketPath = config.get("s3_bucket_path").asText(); + + if (config.has("s3_path_format")) { + builder = builder.withPathFormat(config.get("s3_path_format").asText()); } + + if (config.has("s3_endpoint")) { + builder = builder.withEndpoint(config.get("s3_endpoint").asText()); + } + + if (config.has("part_size")) { + builder = builder.withPartSize(config.get("part_size").asInt()); + } + + final S3CredentialConfig credentialConfig; + if (config.has("access_key_id")) { + credentialConfig = new S3AccessKeyCredentialConfig(config.get("access_key_id").asText(), config.get("secret_access_key").asText()); + } else { + credentialConfig = new S3InstanceProfileCredentialConfig(); + } + builder = builder.withCredentialConfig(credentialConfig); + // In the "normal" S3 destination, this is never null. However, the Redshift and Snowflake copy // destinations don't set a Format config. - S3FormatConfig format = null; - if (config.get("format") != null) { - format = S3FormatConfigs.getS3FormatConfig(config); + if (config.has("format")) { + builder = builder.withFormatConfig(S3FormatConfigs.getS3FormatConfig(config)); } - return new S3DestinationConfig( - config.get("s3_endpoint") == null ? "" : config.get("s3_endpoint").asText(), - config.get("s3_bucket_name").asText(), - bucketPath, - config.get("s3_bucket_region").asText(), - config.get("access_key_id") == null ? "" : config.get("access_key_id").asText(), - config.get("secret_access_key") == null ? "" : config.get("secret_access_key").asText(), - partSize, - format); + + return builder.get(); } public String getEndpoint() { @@ -126,16 +122,16 @@ public String getBucketPath() { return bucketPath; } - public String getBucketRegion() { - return bucketRegion; + public String getPathFormat() { + return pathFormat; } - public String getAccessKeyId() { - return accessKeyId; + public String getBucketRegion() { + return bucketRegion; } - public String getSecretAccessKey() { - return secretAccessKey; + public S3CredentialConfig getS3CredentialConfig() { + return credentialConfig; } public Integer getPartSize() { @@ -168,21 +164,19 @@ public AmazonS3 resetS3Client() { protected AmazonS3 createS3Client() { LOGGER.info("Creating S3 client..."); - final AWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); - - if (accessKeyId.isEmpty() && !secretAccessKey.isEmpty() - || !accessKeyId.isEmpty() && secretAccessKey.isEmpty()) { - throw new RuntimeException("Either both accessKeyId and secretAccessKey should be provided, or neither"); - } + final AWSCredentialsProvider credentialsProvider = credentialConfig.getS3CredentialsProvider(); + final S3CredentialType credentialType = credentialConfig.getCredentialType(); - if (accessKeyId.isEmpty()) { + if (credentialType == S3CredentialType.INSTANCE_PROFILE) { return AmazonS3ClientBuilder.standard() .withRegion(bucketRegion) - .withCredentials(new InstanceProfileCredentialsProvider(false)) + .withCredentials(credentialsProvider) .build(); - } else if (endpoint == null || endpoint.isEmpty()) { + } + + if (endpoint == null || endpoint.isEmpty()) { return AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withCredentials(credentialsProvider) .withRegion(bucketRegion) .build(); } @@ -194,22 +188,10 @@ protected AmazonS3 createS3Client() { .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, bucketRegion)) .withPathStyleAccessEnabled(true) .withClientConfiguration(clientConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withCredentials(credentialsProvider) .build(); } - public S3DestinationConfig cloneWithFormatConfig(final S3FormatConfig formatConfig) { - return new S3DestinationConfig( - this.endpoint, - this.bucketName, - this.bucketPath, - this.bucketRegion, - this.accessKeyId, - this.secretAccessKey, - this.partSize, - formatConfig); - } - @Override public boolean equals(final Object o) { if (this == o) { @@ -221,15 +203,98 @@ public boolean equals(final Object o) { final S3DestinationConfig that = (S3DestinationConfig) o; return Objects.equals(endpoint, that.endpoint) && Objects.equals(bucketName, that.bucketName) && Objects.equals( bucketPath, that.bucketPath) && Objects.equals(bucketRegion, that.bucketRegion) - && Objects.equals(accessKeyId, - that.accessKeyId) - && Objects.equals(secretAccessKey, that.secretAccessKey) && Objects.equals(partSize, that.partSize) + && Objects.equals(credentialConfig, that.credentialConfig) + && Objects.equals(partSize, that.partSize) && Objects.equals(formatConfig, that.formatConfig); } @Override public int hashCode() { - return Objects.hash(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, partSize, formatConfig); + return Objects.hash(endpoint, bucketName, bucketPath, bucketRegion, credentialConfig, partSize, formatConfig); + } + + public static class Builder { + + private String endpoint = ""; + private String pathFormat = S3DestinationConstants.DEFAULT_PATH_FORMAT; + private int partSize = S3DestinationConstants.DEFAULT_PART_SIZE_MB; + + private String bucketName; + private String bucketPath; + private String bucketRegion; + private S3CredentialConfig credentialConfig; + private S3FormatConfig formatConfig; + private AmazonS3 s3Client; + + private Builder(final String bucketName, final String bucketPath, final String bucketRegion) { + this.bucketName = bucketName; + this.bucketPath = bucketPath; + this.bucketRegion = bucketRegion; + } + + public Builder withBucketName(final String bucketName) { + this.bucketName = bucketName; + return this; + } + + public Builder withBucketPath(final String bucketPath) { + this.bucketPath = bucketPath; + return this; + } + + public Builder withBucketRegion(final String bucketRegion) { + this.bucketRegion = bucketRegion; + return this; + } + + public Builder withPathFormat(final String pathFormat) { + this.pathFormat = pathFormat; + return this; + } + + public Builder withEndpoint(final String endpoint) { + this.endpoint = endpoint; + return this; + } + + public Builder withPartSize(final int partSize) { + this.partSize = partSize; + return this; + } + + public Builder withFormatConfig(final S3FormatConfig formatConfig) { + this.formatConfig = formatConfig; + return this; + } + + public Builder withAccessKeyCredential(final String accessKeyId, final String secretAccessKey) { + this.credentialConfig = new S3AccessKeyCredentialConfig(accessKeyId, secretAccessKey); + return this; + } + + public Builder withCredentialConfig(final S3CredentialConfig credentialConfig) { + this.credentialConfig = credentialConfig; + return this; + } + + public Builder withS3Client(final AmazonS3 s3Client) { + this.s3Client = s3Client; + return this; + } + + public S3DestinationConfig get() { + return new S3DestinationConfig( + endpoint, + bucketName, + bucketPath, + bucketRegion, + pathFormat, + credentialConfig, + partSize, + formatConfig, + s3Client); + } + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java index f585954f069da..efe3bde5d0b65 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java @@ -11,6 +11,9 @@ public final class S3DestinationConstants { public static final String YYYY_MM_DD_FORMAT_STRING = "yyyy_MM_dd"; public static final S3NameTransformer NAME_TRANSFORMER = new S3NameTransformer(); public static final String PART_SIZE_MB_ARG_NAME = "part_size_mb"; + // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives + // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. + // WARNING: Too large a part size can cause potential OOM errors. public static final int DEFAULT_PART_SIZE_MB = 10; public static final String DEFAULT_PATH_FORMAT = "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_"; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java index 37312b0dbe37c..a830a7ca70ae0 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java @@ -36,8 +36,8 @@ public class S3StorageOperations implements BlobStorageOperations { private static final int UPLOAD_RETRY_LIMIT = 3; private final NamingConventionTransformer nameTransformer; - private final S3DestinationConfig s3Config; - private AmazonS3 s3Client; + protected final S3DestinationConfig s3Config; + protected AmazonS3 s3Client; public S3StorageOperations(final NamingConventionTransformer nameTransformer, final AmazonS3 s3Client, final S3DestinationConfig s3Config) { this.nameTransformer = nameTransformer; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java index 2c706f54b15d1..05275cc8e2998 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.s3; -import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; @@ -44,28 +43,27 @@ public class SerializedBufferFactory { * creating a new buffer where to store data. Note that we typically associate which format is being * stored in the storage object thanks to its file extension. */ - public static CheckedBiFunction getCreateFunction(final JsonNode config, + public static CheckedBiFunction getCreateFunction(final S3DestinationConfig config, final Function createStorageFunctionWithoutExtension) { - final JsonNode formatConfig = config.get("format"); + final S3FormatConfig formatConfig = config.getFormatConfig(); LOGGER.info("S3 format config: {}", formatConfig.toString()); - final S3Format formatType = S3Format.valueOf(formatConfig.get("format_type").asText().toUpperCase()); - switch (formatType) { + switch (formatConfig.getFormat()) { case AVRO -> { - final Callable createStorageFunctionWithExtension = () -> createStorageFunctionWithoutExtension.apply(".avro"); - return AvroSerializedBuffer.createFunction(new S3AvroFormatConfig(formatConfig), createStorageFunctionWithExtension); + final Callable createStorageFunctionWithExtension = () -> createStorageFunctionWithoutExtension.apply(AvroSerializedBuffer.DEFAULT_SUFFIX); + return AvroSerializedBuffer.createFunction((S3AvroFormatConfig) formatConfig, createStorageFunctionWithExtension); } case CSV -> { - final Callable createStorageFunctionWithExtension = () -> createStorageFunctionWithoutExtension.apply(".csv.gz"); - return CsvSerializedBuffer.createFunction(new S3CsvFormatConfig(formatConfig), createStorageFunctionWithExtension); + final Callable createStorageFunctionWithExtension = () -> createStorageFunctionWithoutExtension.apply(CsvSerializedBuffer.CSV_GZ_SUFFIX); + return CsvSerializedBuffer.createFunction((S3CsvFormatConfig) formatConfig, createStorageFunctionWithExtension); } case JSONL -> { - final Callable createStorageFunctionWithExtension = () -> createStorageFunctionWithoutExtension.apply(".jsonl.gz"); - return JsonLSerializedBuffer.createFunction(new S3JsonlFormatConfig(formatConfig), createStorageFunctionWithExtension); + final Callable createStorageFunctionWithExtension = () -> createStorageFunctionWithoutExtension.apply(JsonLSerializedBuffer.JSONL_GZ_SUFFIX); + return JsonLSerializedBuffer.createFunction((S3JsonlFormatConfig) formatConfig, createStorageFunctionWithExtension); } case PARQUET -> { // we can't choose the type of buffer storage with parquet because of how the underlying hadoop // library is imposing file usage. - return ParquetSerializedBuffer.createFunction(S3DestinationConfig.getS3DestinationConfig(config)); + return ParquetSerializedBuffer.createFunction(config); } default -> { throw new RuntimeException("Unexpected output format: " + Jsons.serialize(config)); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java index bafd1c2fbc1da..aeec54023e4e3 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java @@ -24,6 +24,8 @@ public class AvroSerializedBuffer extends BaseSerializedBuffer { + public static final String DEFAULT_SUFFIX = ".avro"; + private final CodecFactory codecFactory; private final Schema schema; private final AvroRecordFactory avroRecordFactory; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java index 87157331fa268..a4649b04bb04d 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java @@ -19,7 +19,8 @@ public class S3AvroFormatConfig implements S3FormatConfig { public S3AvroFormatConfig(final JsonNode formatConfig) { this.codecFactory = parseCodecConfig(formatConfig.get("compression_codec")); - this.partSize = formatConfig.get(PART_SIZE_MB_ARG_NAME) != null ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() + this.partSize = formatConfig.get(PART_SIZE_MB_ARG_NAME) != null + ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() : S3DestinationConstants.DEFAULT_PART_SIZE_MB; } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/BlobStorageCredentialConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/BlobStorageCredentialConfig.java new file mode 100644 index 0000000000000..2b95fe87f04ab --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/BlobStorageCredentialConfig.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3.credential; + +public interface BlobStorageCredentialConfig { + + CredentialType getCredentialType(); + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3AccessKeyCredentialConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3AccessKeyCredentialConfig.java new file mode 100644 index 0000000000000..d7cb0a562f99f --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3AccessKeyCredentialConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3.credential; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; + +public class S3AccessKeyCredentialConfig implements S3CredentialConfig { + + private final String accessKeyId; + private final String secretAccessKey; + + public S3AccessKeyCredentialConfig(final String accessKeyId, final String secretAccessKey) { + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + } + + @Override + public S3CredentialType getCredentialType() { + return S3CredentialType.ACCESS_KEY; + } + + @Override + public AWSCredentialsProvider getS3CredentialsProvider() { + final AWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey); + return new AWSStaticCredentialsProvider(awsCreds); + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3CredentialConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3CredentialConfig.java new file mode 100644 index 0000000000000..99aec90b639e3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3CredentialConfig.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3.credential; + +import com.amazonaws.auth.AWSCredentialsProvider; + +public interface S3CredentialConfig extends BlobStorageCredentialConfig { + + AWSCredentialsProvider getS3CredentialsProvider(); + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3CredentialType.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3CredentialType.java new file mode 100644 index 0000000000000..f617cf761713d --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3CredentialType.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3.credential; + +public enum S3CredentialType { + + ACCESS_KEY, + INSTANCE_PROFILE + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3InstanceProfileCredentialConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3InstanceProfileCredentialConfig.java new file mode 100644 index 0000000000000..e8b91cd81ad6e --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/credential/S3InstanceProfileCredentialConfig.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3.credential; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; + +public class S3InstanceProfileCredentialConfig implements S3CredentialConfig { + + @Override + public S3CredentialType getCredentialType() { + return S3CredentialType.INSTANCE_PROFILE; + } + + @Override + public AWSCredentialsProvider getS3CredentialsProvider() { + return new InstanceProfileCredentialsProvider(false); + } + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java index af64e555ffb0e..6f7a31740cbbd 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java @@ -49,7 +49,8 @@ public String getValue() { public S3CsvFormatConfig(final JsonNode formatConfig) { this( Flattening.fromValue(formatConfig.has("flattening") ? formatConfig.get("flattening").asText() : Flattening.NO.value), - formatConfig.get(PART_SIZE_MB_ARG_NAME) != null ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() + formatConfig.get(PART_SIZE_MB_ARG_NAME) != null + ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() : S3DestinationConstants.DEFAULT_PART_SIZE_MB); } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java index f81bab4587db0..36ca0bf93ea4f 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java @@ -24,6 +24,8 @@ public class JsonLSerializedBuffer extends BaseSerializedBuffer { + public static final String JSONL_GZ_SUFFIX = ".jsonl.gz"; + private static final ObjectMapper MAPPER = MoreMappers.initMapper(); private PrintWriter printWriter; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java index b91439a38e2d8..ef04ec9caee39 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlFormatConfig.java @@ -16,7 +16,8 @@ public class S3JsonlFormatConfig implements S3FormatConfig { private final Long partSize; public S3JsonlFormatConfig(final JsonNode formatConfig) { - this.partSize = formatConfig.get(PART_SIZE_MB_ARG_NAME) != null ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() + this.partSize = formatConfig.get(PART_SIZE_MB_ARG_NAME) != null + ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() : S3DestinationConstants.DEFAULT_PART_SIZE_MB; } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java index fde906fd9dd1f..678c6cef7816b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java @@ -9,6 +9,7 @@ import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import io.airbyte.integrations.destination.s3.writer.BaseS3Writer; import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -78,8 +79,9 @@ public S3ParquetWriter(final S3DestinationConfig config, public static Configuration getHadoopConfig(final S3DestinationConfig config) { final Configuration hadoopConfig = new Configuration(); - hadoopConfig.set(Constants.ACCESS_KEY, config.getAccessKeyId()); - hadoopConfig.set(Constants.SECRET_KEY, config.getSecretAccessKey()); + final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) config.getS3CredentialConfig(); + hadoopConfig.set(Constants.ACCESS_KEY, credentialConfig.getAccessKeyId()); + hadoopConfig.set(Constants.SECRET_KEY, credentialConfig.getSecretAccessKey()); if (config.getEndpoint().isEmpty()) { hadoopConfig.set(Constants.ENDPOINT, String.format("s3.%s.amazonaws.com", config.getBucketRegion())); } else { @@ -137,7 +139,7 @@ public S3Format getFileFormat() { } @Override - public void write(JsonNode formattedData) throws IOException { + public void write(final JsonNode formattedData) throws IOException { parquetWriter.write(avroRecordFactory.getAvroRecord(formattedData)); } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java index e0f52d48934a5..8334c38637a62 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java @@ -73,10 +73,10 @@ protected JsonNode getConfig() { @Override protected String getDefaultSchema(final JsonNode config) { - if (config.get("s3_bucket_path") == null) { - return null; + if (config.has("s3_bucket_path")) { + return config.get("s3_bucket_path").asText(); } - return config.get("s3_bucket_path").asText(); + return null; } @Override @@ -99,7 +99,8 @@ protected List getAllSyncedObjects(final String streamName, fin namespaceStr, streamNameStr, DateTime.now(DateTimeZone.UTC), - S3DestinationConstants.DEFAULT_PATH_FORMAT); + config.getPathFormat()); + // the child folder contains a non-deterministic epoch timestamp, so use the parent folder final String parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1); final List objectSummaries = s3Client .listObjects(config.getBucketName(), parentFolder) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationConfigTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationConfigTest.java new file mode 100644 index 0000000000000..a289fdd6db8e7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationConfigTest.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; +import org.junit.jupiter.api.Test; + +class S3DestinationConfigTest { + + private static final S3DestinationConfig CONFIG = S3DestinationConfig.create("test-bucket", "test-path", "test-region") + .withEndpoint("test-endpoint") + .withPartSize(19) + .withPathFormat("${STREAM_NAME}/${NAMESPACE}") + .withAccessKeyCredential("test-key", "test-secret") + .get(); + + @Test + public void testCreateFromExistingConfig() { + assertEquals(CONFIG, S3DestinationConfig.create(CONFIG).get()); + } + + @Test + public void testCreateAndModify() { + final String newBucketName = "new-bucket"; + final String newBucketPath = "new-path"; + final String newBucketRegion = "new-region"; + final String newEndpoint = "new-endpoint"; + final int newPartSize = 29; + final String newKey = "new-key"; + final String newSecret = "new-secret"; + + final S3DestinationConfig modifiedConfig = S3DestinationConfig.create(CONFIG) + .withBucketName(newBucketName) + .withBucketPath(newBucketPath) + .withBucketRegion(newBucketRegion) + .withEndpoint(newEndpoint) + .withAccessKeyCredential(newKey, newSecret) + .withPartSize(newPartSize) + .get(); + + assertNotEquals(CONFIG, modifiedConfig); + assertEquals(newBucketName, modifiedConfig.getBucketName()); + assertEquals(newBucketPath, modifiedConfig.getBucketPath()); + assertEquals(newBucketRegion, modifiedConfig.getBucketRegion()); + assertEquals(newPartSize, modifiedConfig.getPartSize()); + + final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) modifiedConfig.getS3CredentialConfig(); + assertEquals(newKey, credentialConfig.getAccessKeyId()); + assertEquals(newSecret, credentialConfig.getSecretAccessKey()); + } + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java index b28dd256b6e84..550bf0ae1bb62 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java @@ -43,14 +43,11 @@ public void setup() { when(s3.uploadPart(any(UploadPartRequest.class))).thenReturn(uploadPartResult); when(s3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(uploadResult); - config = new S3DestinationConfig( - "fake-endpoint", - "fake-bucket", - "fake-bucketPath", - "fake-region", - "fake-accessKeyId", - "fake-secretAccessKey", - S3DestinationConfig.DEFAULT_PART_SIZE_MB, null, s3); + config = S3DestinationConfig.create("fake-bucket", "fake-bucketPath", "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey") + .withS3Client(s3) + .get(); } @Test @@ -61,15 +58,11 @@ public void checksS3WithoutListObjectPermission() { final S3Destination destinationFail = new S3Destination(new S3DestinationConfigFactory() { public S3DestinationConfig getS3DestinationConfig(final JsonNode config) { - return new S3DestinationConfig( - "fake-endpoint", - "fake-bucket", - "fake-bucketPath", - "fake-region", - "fake-accessKeyId", - "fake-secretAccessKey", - S3DestinationConfig.DEFAULT_PART_SIZE_MB, - null, s3); + return S3DestinationConfig.create("fake-bucket", "fake-bucketPath", "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey") + .withS3Client(s3) + .get(); } }); @@ -87,15 +80,11 @@ public void checksS3WithListObjectPermission() { final S3Destination destinationSuccess = new S3Destination(new S3DestinationConfigFactory() { public S3DestinationConfig getS3DestinationConfig(final JsonNode config) { - return new S3DestinationConfig( - "fake-endpoint", - "fake-bucket", - "fake-bucketPath", - "fake-region", - "fake-accessKeyId", - "fake-secretAccessKey", - S3DestinationConfig.DEFAULT_PART_SIZE_MB, - null, s3); + return S3DestinationConfig.create("fake-bucket", "fake-bucketPath", "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey") + .withS3Client(s3) + .get(); } }); diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java index 16694a4110ee8..83a713e01b9fe 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java @@ -49,13 +49,12 @@ public class AvroSerializedBufferTest { Field.of("another field", JsonSchemaType.BOOLEAN), Field.of("nested_column", JsonSchemaType.OBJECT)); private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS); - private static final String AVRO_FILE_EXTENSION = ".avro"; @Test public void testSnappyAvroWriter() throws Exception { final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of( "codec", "snappy")))); - runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 965L, 980L, config, getExpectedString()); + runTest(new InMemoryBuffer(AvroSerializedBuffer.DEFAULT_SUFFIX), 965L, 980L, config, getExpectedString()); } @Test @@ -64,14 +63,14 @@ public void testGzipAvroFileWriter() throws Exception { "codec", "zstandard", "compression_level", 20, "include_checksum", true)))); - runTest(new FileBuffer(AVRO_FILE_EXTENSION), 970L, 980L, config, getExpectedString()); + runTest(new FileBuffer(AvroSerializedBuffer.DEFAULT_SUFFIX), 970L, 980L, config, getExpectedString()); } @Test public void testUncompressedAvroWriter() throws Exception { final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of( "codec", "no compression")))); - runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 1010L, 1020L, config, getExpectedString()); + runTest(new InMemoryBuffer(AvroSerializedBuffer.DEFAULT_SUFFIX), 1010L, 1020L, config, getExpectedString()); } private static String getExpectedString() { diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java index 456f614389b13..01ec9e5295f17 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriterTest.java @@ -53,16 +53,15 @@ class S3CsvWriterTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final int PART_SIZE = 7; - public static final S3DestinationConfig CONFIG = new S3DestinationConfig( - "fake-endpoint", + public static final S3DestinationConfig CONFIG = S3DestinationConfig.create( "fake-bucket", "fake-bucketPath", - "fake-region", - "fake-access-key-id", - "fake-secret-access-key", - // The part size is configured in the format config. This field is only used by S3StreamCopier. - null, - new S3CsvFormatConfig(Flattening.NO, (long) PART_SIZE)); + "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") + .withPartSize(PART_SIZE) + .withFormatConfig(new S3CsvFormatConfig(Flattening.NO, (long) PART_SIZE)) + .get(); // equivalent to Thu, 09 Dec 2021 19:17:54 GMT private static final Timestamp UPLOAD_TIME = Timestamp.from(Instant.ofEpochMilli(1639077474000L)); @@ -247,17 +246,17 @@ public void writesContentsCorrectly_when_headerDisabled() throws IOException { */ @Test public void writesContentsCorrectly_when_stagingDatabaseConfig() throws IOException { + final S3DestinationConfig s3Config = S3DestinationConfig.create( + "fake-bucket", + "fake-bucketPath", + "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") + .withPartSize(PART_SIZE) + .withFormatConfig(new S3CsvFormatConfig(null, (long) PART_SIZE)) + .get(); final S3CsvWriter writer = new Builder( - new S3DestinationConfig( - "fake-endpoint", - "fake-bucket", - "fake-bucketPath", - "fake-region", - "fake-access-key-id", - "fake-secret-access-key", - // The part size is configured in the format config. This field is only used by S3StreamCopier. - null, - new S3CsvFormatConfig(null, (long) PART_SIZE)), + s3Config, s3Client, CONFIGURED_STREAM, UPLOAD_TIME).uploadThreads(UPLOAD_THREADS) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/util/ConfigTestUtils.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/util/ConfigTestUtils.java index c3c213da0e2dd..f9bf6b1771c52 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/util/ConfigTestUtils.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/util/ConfigTestUtils.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; public class ConfigTestUtils { @@ -29,8 +30,9 @@ public static void assertBaseConfig(final S3DestinationConfig s3DestinationConfi assertEquals("test-bucket-name", s3DestinationConfig.getBucketName()); assertEquals("test_path", s3DestinationConfig.getBucketPath()); assertEquals("us-east-2", s3DestinationConfig.getBucketRegion()); - assertEquals("some-test-key-id", s3DestinationConfig.getAccessKeyId()); - assertEquals("some-test-access-key", s3DestinationConfig.getSecretAccessKey()); + final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3DestinationConfig.getS3CredentialConfig(); + assertEquals("some-test-key-id", credentialConfig.getAccessKeyId()); + assertEquals("some-test-access-key", credentialConfig.getSecretAccessKey()); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java index f5602d357c8b6..7f7cbe475cc8a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperations.java @@ -12,6 +12,7 @@ import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3StorageOperations; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import io.airbyte.integrations.destination.staging.StagingOperations; import java.util.List; import java.util.Map; @@ -98,12 +99,13 @@ protected String getCopyQuery(final String stageName, final List stagedFiles, final String dstTableName, final String schemaName) { + final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig(); return String.format(COPY_QUERY + generateFilesList(stagedFiles) + ";", schemaName, dstTableName, generateBucketPath(stageName, stagingPath), - s3Config.getAccessKeyId(), - s3Config.getSecretAccessKey()); + credentialConfig.getAccessKeyId(), + credentialConfig.getSecretAccessKey()); } private String generateBucketPath(final String stageName, final String stagingPath) { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index 302969d0707fd..e524f0b2e9824 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -14,6 +14,7 @@ import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.sql.SQLException; @@ -79,7 +80,7 @@ public SnowflakeS3StreamCopier(final String stagingFolder, @Override public void copyStagingFileToTemporaryTable() throws Exception { - List> partitions = Lists.partition(new ArrayList<>(getStagingFiles()), MAX_FILES_PER_COPY); + final List> partitions = Lists.partition(new ArrayList<>(getStagingFiles()), MAX_FILES_PER_COPY); LOGGER.info("Starting parallel copy to tmp table: {} in destination for stream: {}, schema: {}. Chunks count {}", tmpTableName, streamName, schemaName, partitions.size()); @@ -88,7 +89,8 @@ public void copyStagingFileToTemporaryTable() throws Exception { } @Override - public void copyIntoStage(List files) { + public void copyIntoStage(final List files) { + final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig(); final var copyQuery = String.format( "COPY INTO %s.%s FROM '%s' " + "CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s') " @@ -97,8 +99,8 @@ public void copyIntoStage(List files) { schemaName, tmpTableName, generateBucketPath(), - s3Config.getAccessKeyId(), - s3Config.getSecretAccessKey()); + credentialConfig.getAccessKeyId(), + credentialConfig.getSecretAccessKey()); Exceptions.toRuntime(() -> db.execute(copyQuery)); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java index f086021c424de..33941e3f51d52 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingSqlOperationsTest.java @@ -10,6 +10,7 @@ import com.amazonaws.services.s3.AmazonS3; import io.airbyte.integrations.destination.s3.S3DestinationConfig; +import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; import java.util.List; import org.junit.jupiter.api.Test; @@ -23,6 +24,7 @@ class SnowflakeS3StagingSqlOperationsTest { private final AmazonS3 s3Client = mock(AmazonS3.class); private final S3DestinationConfig s3Config = mock(S3DestinationConfig.class); + private final S3AccessKeyCredentialConfig credentialConfig = mock(S3AccessKeyCredentialConfig.class); private final SnowflakeS3StagingSqlOperations snowflakeStagingSqlOperations = new SnowflakeS3StagingSqlOperations(new SnowflakeSQLNameTransformer(), s3Client, s3Config); @@ -33,8 +35,9 @@ void copyIntoTmpTableFromStage() { "CREDENTIALS=(aws_key_id='aws_access_key_id' aws_secret_key='aws_secret_access_key') file_format = (type = csv compression = auto " + "field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"') files = ('filename1','filename2');"; when(s3Config.getBucketName()).thenReturn(BUCKET_NAME); - when(s3Config.getAccessKeyId()).thenReturn("aws_access_key_id"); - when(s3Config.getSecretAccessKey()).thenReturn("aws_secret_access_key"); + when(s3Config.getS3CredentialConfig()).thenReturn(credentialConfig); + when(credentialConfig.getAccessKeyId()).thenReturn("aws_access_key_id"); + when(credentialConfig.getSecretAccessKey()).thenReturn("aws_secret_access_key"); final String actualCopyQuery = snowflakeStagingSqlOperations.getCopyQuery(STAGE_NAME, STAGE_PATH, List.of("filename1", "filename2"), TABLE_NAME, SCHEMA_NAME); assertEquals(expectedQuery, actualCopyQuery); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java index d8043bf8ab83b..4ae1bc9c32db8 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierTest.java @@ -46,23 +46,22 @@ public void setup() throws Exception { db = mock(JdbcDatabase.class); sqlOperations = mock(SqlOperations.class); + final S3DestinationConfig s3Config = S3DestinationConfig.create( + "fake-bucket", + "fake-bucketPath", + "fake-region") + .withEndpoint("fake-endpoint") + .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") + .withPartSize(PART_SIZE) + .get(); + copier = (SnowflakeS3StreamCopier) new SnowflakeS3StreamCopierFactory().create( // In reality, this is normally a UUID - see CopyConsumerFactory#createWriteConfigs "fake-staging-folder", "fake-schema", s3Client, db, - new S3CopyConfig( - true, - new S3DestinationConfig( - "fake-endpoint", - "fake-bucket", - "fake-bucketPath", - "fake-region", - "fake-access-key-id", - "fake-secret-access-key", - PART_SIZE, - null)), + new S3CopyConfig(true, s3Config), new ExtendedNameTransformer(), sqlOperations, new ConfiguredAirbyteStream() @@ -80,7 +79,7 @@ public void copiesCorrectFilesToTable() throws Exception { } copier.copyStagingFileToTemporaryTable(); - Set stagingFiles = copier.getStagingFiles(); + final Set stagingFiles = copier.getStagingFiles(); // check the use of all files for staging Assertions.assertTrue(stagingFiles.size() > 1); diff --git a/docs/integrations/destinations/gcs.md b/docs/integrations/destinations/gcs.md index 8b627e70c537a..6777c27cabb63 100644 --- a/docs/integrations/destinations/gcs.md +++ b/docs/integrations/destinations/gcs.md @@ -40,7 +40,7 @@ The full path of the output data is: For example: ```text -testing_bucket/data_output_path/public/users/2021_01_01_1609541171643_0.csv +testing_bucket/data_output_path/public/users/2021_01_01_1609541171643_0.csv.gz ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ | | | | | | | format extension | | | | | | partition id @@ -56,7 +56,7 @@ Please note that the stream name may contain a prefix, if it is configured on th The rationales behind this naming pattern are: 1. Each stream has its own directory. 2. The data output files can be sorted by upload time. 3. The upload time composes of a date part and millis part so that it is both readable and unique. -Currently, each data sync will only create one file per stream. In the future, the output file can be partitioned by size. Each partition is identifiable by the partition ID, which is always 0 for now. +A data sync may create multiple files as the output files can be partitioned by size (targeting a size of 200MB compressed or lower) . ## Output Schema @@ -135,6 +135,8 @@ With root level normalization, the output CSV is: | :--- | :--- | :--- | :--- | | `26d73cde-7eb1-4e1e-b7db-a4c03b4cf206` | 1622135805000 | 123 | `{ "first": "John", "last": "Doe" }` | +Output CSV files will always be compressed using GZIP compression. + ### JSON Lines \(JSONL\) [Json Lines](https://jsonlines.org/) is a text format with one JSON per line. Each line has a structure as follows: @@ -175,6 +177,8 @@ They will be like this in the output file: { "_airbyte_ab_id": "0a61de1b-9cdd-4455-a739-93572c9a5f20", "_airbyte_emitted_at": "1631948170000", "_airbyte_data": { "user_id": 456, "name": { "first": "Jane", "last": "Roe" } } } ``` +Output CSV files will always be compressed using GZIP compression. + ### Parquet #### Configuration @@ -229,6 +233,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.2.0 (unpublished) | 2022-04-04 | [\#11686](https://github.com/airbytehq/airbyte/pull/11686) | Use serialized buffering strategy to reduce memory consumption; compress CSV and JSONL formats. | | 0.1.22 | 2022-02-12 | [\#10256](https://github.com/airbytehq/airbyte/pull/10256) | Add JVM flag to exist on OOME. | | 0.1.21 | 2022-02-12 | [\#10299](https://github.com/airbytehq/airbyte/pull/10299) | Fix connection check to require only the necessary permissions. | | 0.1.20 | 2022-01-11 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Avro & Parquet: support array field with unknown item type; default any improperly typed field to string. |