Skip to content

Azure Blob Storage destination crashes due to lack of buffering #5980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vholmer opened this issue Sep 10, 2021 · 6 comments
Closed

Azure Blob Storage destination crashes due to lack of buffering #5980

vholmer opened this issue Sep 10, 2021 · 6 comments

Comments

@vholmer
Copy link

vholmer commented Sep 10, 2021

Enviroment

  • Airbyte version: 0.26.15-alpha
  • OS Version / Instance: Ubuntu 18.04
  • Deployment: Docker
  • Source Connector and version: Oracle DB 0.3.3
  • Destination Connector and version: Azure Blob Storage 0.1.0
  • Severity: Critical (for this destination connector)
  • Step where error happened: Sync job

Current Behavior

Sync crashes after reaching 50000 blocks in file due to the fact that BlobStorageClient doesn't utilize its own buffering. Each message is sent in its own block which is not optimal. We should implement some kind of buffering before we send a message to the blob.

Expected Behavior

Sync shouldn't crash, the file should keep growing without reaching a high committed block count.

Other details

The file was automatically created & filled by Airbyte and contained 50000 lines and 50000 committed blocks.

Logs

LOG
2021-09-10 08:08:43 INFO () LogClientSingleton(setJobMdc):146 - Setting docker job mdc
2021-09-10 08:08:53 INFO () LogClientSingleton(setJobMdc):146 - Setting docker job mdc
2021-09-10 08:09:03 INFO () LogClientSingleton(setJobMdc):146 - Setting docker job mdc
2021-09-10 08:09:11 ERROR () LineGobbler(voidCall):85 - Sep 10, 2021 8:09:11 AM oracle.simplefan.impl.FanManager configure
2021-09-10 08:09:11 ERROR () LineGobbler(voidCall):85 - SEVERE: attempt to configure ONS in FanManager failed with oracle.ons.NoServersAvailable: Subscription time out
2021-09-10 08:09:13 INFO () LogClientSingleton(setJobMdc):146 - Setting docker job mdc
2021-09-10 08:09:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:14 [32mINFO[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):478 - {} - Queueing query for table: ANONYMIZED_NAME1
2021-09-10 08:09:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:14 [32mINFO[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):478 - {} - Queueing query for table: ANONYMIZED_NAME2
2021-09-10 08:09:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:14 [32mINFO[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):478 - {} - Queueing query for table: ANONYMIZED_NAME3
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:16 [1;31mERROR[m c.a.c.u.l.ClientLogger(performLogging):350 - {} - com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlockCountExceedsLimit</Code><Message>The committed block count cannot exceed the maximum limit of 50,000 blocks.
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - Time:2021-09-10T08:09:16.3414416Z</Message></Error>"
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:16 [1;31mERROR[m i.a.i.d.a.AzureBlobStorageConsumer(acceptTracked):167 - {} - Failed to write messagefor stream io.airbyte.integrations.destination.azure_blob_storage.jsonl.AzureBlobStorageJsonlWriter@4f63e3c7, details: com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlockCountExceedsLimit</Code><Message>The committed block count cannot exceed the maximum limit of 50,000 blocks.
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - Time:2021-09-10T08:09:16.3414416Z</Message></Error>"
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:16 [33mWARN[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):78 - {} - Airbyte message consumer: failed.
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:16 [33mWARN[m i.a.i.d.a.w.BaseAzureBlobStorageWriter(close):66 - {} - Failure detected. Aborting upload of stream 'ANONYMIZED_NAME1'...
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:16 [1;31mERROR[m c.a.c.u.l.ClientLogger(performLogging):350 - {} - com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlockCountExceedsLimit</Code><Message>The committed block count cannot exceed the maximum limit of 50,000 blocks.
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - Time:2021-09-10T08:09:16.3414416Z</Message></Error>"
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 08:09:16 [1;31mERROR[m c.a.c.u.l.ClientLogger(performLogging):350 - {} - com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlockCountExceedsLimit</Code><Message>The committed block count cannot exceed the maximum limit of 50,000 blocks.
2021-09-10 08:09:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - Time:2021-09-10T08:09:16.3414416Z</Message></Error>"
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlockCountExceedsLimit</Code><Message>The committed block count cannot exceed the maximum limit of 50,000 blocks.
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - Time:2021-09-10T08:09:16.3414416Z</Message></Error>"
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageConsumer.acceptTracked(AzureBlobStorageConsumer.java:169)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:66)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:167)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:148)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestination.main(AzureBlobStorageDestination.java:47)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	Suppressed: java.lang.RuntimeException: com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlockCountExceedsLimit</Code><Message>The committed block count cannot exceed the maximum limit of 50,000 blocks.
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - Time:2021-09-10T08:09:16.3414416Z</Message></Error>"
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at com.azure.storage.common.StorageOutputStream.checkStreamState(StorageOutputStream.java:79)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at com.azure.storage.blob.specialized.BlobOutputStream.close(BlobOutputStream.java:119)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:353)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:168)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at java.base/java.io.BufferedWriter.close(BufferedWriter.java:269)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at java.base/java.io.PrintWriter.close(PrintWriter.java:415)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at io.airbyte.integrations.destination.azure_blob_storage.jsonl.AzureBlobStorageJsonlWriter.closeWhenFail(AzureBlobStorageJsonlWriter.java:86)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter.close(BaseAzureBlobStorageWriter.java:67)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageConsumer.close(AzureBlobStorageConsumer.java:176)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:82)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:161)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 		... 2 more
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - Caused by: java.lang.RuntimeException: com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlockCountExceedsLimit</Code><Message>The committed block count cannot exceed the maximum limit of 50,000 blocks.
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - Time:2021-09-10T08:09:16.3414416Z</Message></Error>"
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at com.azure.storage.common.StorageOutputStream.checkStreamState(StorageOutputStream.java:79)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at com.azure.storage.common.StorageOutputStream.flush(StorageOutputStream.java:89)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:327)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:159)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.io.BufferedWriter.flush(BufferedWriter.java:257)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.io.PrintWriter.newLine(PrintWriter.java:568)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.io.PrintWriter.println(PrintWriter.java:711)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.io.PrintWriter.println(PrintWriter.java:822)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.destination.azure_blob_storage.jsonl.AzureBlobStorageJsonlWriter.write(AzureBlobStorageJsonlWriter.java:74)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageConsumer.acceptTracked(AzureBlobStorageConsumer.java:164)
2021-09-10 08:09:16 ERROR () LineGobbler(voidCall):85 - 	... 4 more

Steps to Reproduce

  1. Read some large data (>50Mb) into destination connector described above.
  2. 🔥🔥🔥

Are you willing to submit a PR?

No.

@vholmer vholmer added the type/bug Something isn't working label Sep 10, 2021
@vholmer vholmer changed the title BlobStorageClient crashes due to lack of buffering Azure Blob Storage destination crashes due to lack of buffering Sep 10, 2021
@sherifnada sherifnada added area/connectors Connector related issues lang/java labels Sep 10, 2021
@sherifnada
Copy link
Contributor

@vholmer thanks for the heads up! Will get to it

@bmatticus
Copy link
Contributor

bmatticus commented Dec 28, 2021

Just wanted to comment here as we had the same issue and have patched a local copy of the destination connector to fix the issue. The issue appears to be that the connector uses getBlobOutputStream to get an output stream and uses that directly. Azure recommends wrapping it in a BufferedOutputStream. Doing this appears to fix the issue and make things work quite a bit faster, however we also set the PrintWriter autflush to false which may also be contributing.

The issue, I believe, is that as an output stream it commits too quickly and the SDK from azure tries to auto detect block size. Since its not buffering and autoflushing the block size is probably set to the smallest possible, depending on how large the rows are on the source. Azure Block Blob does have a hard limit of 50k blocks per blob, which you cannot get around, increasing the block size is the only way to do it. I didn't find a good way of altering the block size directly through the connector, but disabling autoflush and wrapping in BufferedOutputStream have solved our issues.

See modified csv/AzureBlobStorageCsvWriter.java below.

/*
 * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
 */

package io.airbyte.integrations.destination.azure_blob_storage.csv;

import com.azure.storage.blob.specialized.AppendBlobClient;
import com.azure.storage.blob.specialized.BlobOutputStream;
import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig;
import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriter;
import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.io.BufferedOutputStream;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implements
    AzureBlobStorageWriter {

  private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageCsvWriter.class);

  private final CsvSheetGenerator csvSheetGenerator;
  private final CSVPrinter csvPrinter;
  private final BlobOutputStream blobOutputStream;
  private final BufferedOutputStream blobBufferedOutputStream;

  public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config,
                                   final AppendBlobClient appendBlobClient,
                                   final ConfiguredAirbyteStream configuredStream,
                                   final boolean isNewlyCreatedBlob)
      throws IOException {
    super(config, appendBlobClient, configuredStream);

    final AzureBlobStorageCsvFormatConfig formatConfig = (AzureBlobStorageCsvFormatConfig) config
        .getFormatConfig();

    this.csvSheetGenerator = CsvSheetGenerator.Factory
        .create(configuredStream.getStream().getJsonSchema(),
            formatConfig);

    this.blobOutputStream = appendBlobClient.getBlobOutputStream();
    this.blobBufferedOutputStream = new BufferedOutputStream(this.blobOutputStream, 104857600); // Buffer 100MB at a time

    if (isNewlyCreatedBlob) {
      this.csvPrinter = new CSVPrinter(
          new PrintWriter(blobBufferedOutputStream, false, StandardCharsets.UTF_8),
          CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)
              .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0])));
    } else {
      // no header required for append
      this.csvPrinter = new CSVPrinter(
          new PrintWriter(blobBufferedOutputStream, false, StandardCharsets.UTF_8),
          CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL));
    }
  }

  @Override
  public void write(final UUID id, final AirbyteRecordMessage recordMessage) throws IOException {
    csvPrinter.printRecord(csvSheetGenerator.getDataRow(id, recordMessage));
  }

  @Override
  protected void closeWhenSucceed() throws IOException {
    LOGGER.info("Closing csvPrinter when succeed");
    csvPrinter.close();
  }

  @Override
  protected void closeWhenFail() throws IOException {
    LOGGER.info("Closing csvPrinter when failed");
    csvPrinter.close();
  }

}

@sherifnada
Copy link
Contributor

@bmatticus this is great, thanks for sharing! Would you be open to submitting a PR with your changes?

@bmatticus
Copy link
Contributor

bmatticus commented Jan 5, 2022

@bmatticus this is great, thanks for sharing! Would you be open to submitting a PR with your changes?

I have one in now, 9190

@bmatticus
Copy link
Contributor

bmatticus commented Jan 12, 2022

@vholmer the PR has been merged and 0.1.1 version of the destination is now available. I think it should fix your issue. The default setting is 5MB buffer at the moment, if you have tables larger than about 195GB you may need to increase the buffer accordingly. Azure documents their blob block sizes, https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs. It goes up to 4000MB, but the max buffer supported by buffered writer is about 2048MB. There is some logic the azure append SDK does that determines the block size, I've not dug into that deep enough to understand it entirely.

@sherifnada sherifnada moved this to Backlog in GL Roadmap Jan 12, 2022
@igrankova igrankova moved this from Backlog to Backlog (unscoped) in GL Roadmap Feb 2, 2022
@grishick grishick moved this from Backlog (unscoped) to Prioritized for scoping in GL Roadmap Apr 21, 2022
@VitaliiMaltsev
Copy link
Contributor

I can't reproduce this issue. I believe it was fixed in scope of #9190

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Status: Prioritized for scoping
Development

No branches or pull requests

7 participants