diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json index 4582e30382950..3c436bbd29d64 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb", "name": "Azure Blob Storage", "dockerRepository": "airbyte/destination-azure-blob-storage", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage", "icon": "azureblobstorage.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index eaae8e1430c88..b8c923039369f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -1,7 +1,7 @@ - name: Azure Blob Storage destinationDefinitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb dockerRepository: airbyte/destination-azure-blob-storage - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/destinations/azureblobstorage icon: azureblobstorage.svg - name: Amazon SQS diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile index a33b5ab5272de..8e644aa025fe7 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-azure-blob-storage COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/destination-azure-blob-storage diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java index fb2777db8a5e7..8d575214b678c 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java @@ -15,6 +15,7 @@ public class AzureBlobStorageDestinationConfig { private final String accountName; private final String accountKey; private final String containerName; + private final int outputStreamBufferSize; private final AzureBlobStorageFormatConfig formatConfig; public AzureBlobStorageDestinationConfig( @@ -22,11 +23,13 @@ public AzureBlobStorageDestinationConfig( final String accountName, final String accountKey, final String containerName, + final int outputStreamBufferSize, final AzureBlobStorageFormatConfig formatConfig) { this.endpointUrl = endpointUrl; this.accountName = accountName; this.accountKey = accountKey; this.containerName = containerName; + this.outputStreamBufferSize = outputStreamBufferSize; this.formatConfig = formatConfig; } @@ -50,12 +53,22 @@ public AzureBlobStorageFormatConfig getFormatConfig() { return formatConfig; } + public int getOutputStreamBufferSize() { + // Convert from MB to Bytes + return outputStreamBufferSize * 1024 * 1024; + } + public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final JsonNode config) { final String accountNameFomConfig = config.get("azure_blob_storage_account_name").asText(); final String accountKeyFromConfig = config.get("azure_blob_storage_account_key").asText(); final JsonNode endpointFromConfig = config .get("azure_blob_storage_endpoint_domain_name"); final JsonNode containerName = config.get("azure_blob_storage_container_name"); + final int outputStreamBufferSizeFromConfig = + config.get("azure_blob_storage_output_buffer_size") != null + ? config.get("azure_blob_storage_output_buffer_size").asInt(DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE) + : DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE; + final JsonNode blobName = config.get("azure_blob_storage_blob_name"); // streamId final String endpointComputed = String.format(Locale.ROOT, DEFAULT_STORAGE_ENDPOINT_FORMAT, @@ -72,6 +85,7 @@ public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final accountNameFomConfig, accountKeyFromConfig, containerNameComputed, + outputStreamBufferSizeFromConfig, AzureBlobStorageFormatConfigs.getAzureBlobStorageFormatConfig(config)); } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java index 1cd15481997c4..737bb5f634228 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java @@ -10,6 +10,7 @@ public final class AzureBlobStorageDestinationConstants { public static final String DEFAULT_STORAGE_ENDPOINT_HTTP_PROTOCOL = "https"; public static final String DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME = "blob.core.windows.net"; public static final String DEFAULT_STORAGE_ENDPOINT_FORMAT = "%s://%s.%s"; + public static final int DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE = 1024 * 1024 * 100; // 100MB private AzureBlobStorageDestinationConstants() {} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java index 420202eac6d15..23e31bbf4d9ce 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java @@ -11,6 +11,7 @@ import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; @@ -28,7 +29,7 @@ public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implem private final CsvSheetGenerator csvSheetGenerator; private final CSVPrinter csvPrinter; - private final BlobOutputStream blobOutputStream; + private final BufferedOutputStream blobOutputStream; public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config, final AppendBlobClient appendBlobClient, @@ -44,17 +45,17 @@ public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config, .create(configuredStream.getStream().getJsonSchema(), formatConfig); - this.blobOutputStream = appendBlobClient.getBlobOutputStream(); + this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize()); if (isNewlyCreatedBlob) { this.csvPrinter = new CSVPrinter( - new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8), + new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); } else { // no header required for append this.csvPrinter = new CSVPrinter( - new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8), + new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)); } } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java index aed0849ede5e5..6a0406be7a7e2 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java @@ -17,6 +17,7 @@ import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; @@ -32,7 +33,7 @@ public class AzureBlobStorageJsonlWriter extends BaseAzureBlobStorageWriter impl private static final ObjectMapper MAPPER = MoreMappers.initMapper(); private static final ObjectWriter WRITER = MAPPER.writer(); - private final BlobOutputStream blobOutputStream; + private final BufferedOutputStream blobOutputStream; private final PrintWriter printWriter; public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig config, @@ -41,8 +42,8 @@ public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig confi final boolean isNewlyCreatedBlob) { super(config, appendBlobClient, configuredStream); // at this moment we already receive appendBlobClient initialized - this.blobOutputStream = appendBlobClient.getBlobOutputStream(); - this.printWriter = new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8); + this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize()); + this.printWriter = new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8); } @Override diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json index b3b7c6ea78979..efe65cb62d8f1 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json @@ -35,6 +35,7 @@ "examples": ["airbyte5storage"] }, "azure_blob_storage_account_key": { + "title": "Azure Blob Storage account key", "description": "The Azure blob storage account key.", "airbyte_secret": true, "type": "string", @@ -42,6 +43,15 @@ "Z8ZkZpteggFx394vm+PJHnGTvdRncaYS+JhLKdj789YNmD+iyGTnG+PV+POiuYNhBg/ACS+LKjd%4FG3FHGN12Nd==" ] }, + "azure_blob_storage_output_buffer_size": { + "title": "Azure Blob Storage output buffer size", + "type": "integer", + "description": "The amount of megabytes to buffer for the output stream to Azure. This will impact memory footprint on workers, but may need adjustment for performance and appropriate block size in Azure.", + "minimum": 1, + "maximum": 2047, + "default": 5, + "examples": [5] + }, "format": { "title": "Output Format", "type": "object", diff --git a/docs/integrations/destinations/azureblobstorage.md b/docs/integrations/destinations/azureblobstorage.md index 8c8c29b260687..94809438f453e 100644 --- a/docs/integrations/destinations/azureblobstorage.md +++ b/docs/integrations/destinations/azureblobstorage.md @@ -22,6 +22,7 @@ The Airbyte Azure Blob Storage destination allows you to sync data to Azure Blob | Azure blob storage container \(Bucket\) Name | string | A name of the Azure blob storage container. If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp. | | Azure Blob Storage account name | string | The account's name of the Azure Blob Storage. | | The Azure blob storage account key | string | Azure blob storage account key. Example: `abcdefghijklmnopqrstuvwxyz/0123456789+ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789%++sampleKey==`. | +| Azure Blob Storage output buffer size | integer | Azure Blob Storage output buffer size, in megabytes. Example: 5 | | Format | object | Format specific configuration. See below for details. | ⚠️ Please note that under "Full Refresh Sync" mode, data in the configured blob will be wiped out before each sync. We recommend you to provision a dedicated Azure Blob Storage Container resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ @@ -136,5 +137,7 @@ They will be like this in the output file: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.1 | 2021-12-29 | [\#5332](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. | | 0.1.0 | 2021-08-30 | [\#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. | +