Skip to content

[EPIC] scale warehouse destination connectors to handle arbitrary number of streams #10260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
7 of 30 tasks
sherifnada opened this issue Feb 11, 2022 · 18 comments
Open
7 of 30 tasks

Comments

@sherifnada
Copy link
Contributor

sherifnada commented Feb 11, 2022

Tell us about the problem you're trying to solve

Today destination warehouses or blob storages like S3 hold a constant-size, in-memory buffer for each stream it expects to see from the input catalog. For example, this comment in the S3 code illustrates that the more streams we have coming in, the more memory the connector must hold for buffers.

This has two obvious problems. First is that it does not scale. Second, it's unnecessary because we often write files whose max size is no more than 1GB (because it's optimal for consuming warehouses). So, it doesn't really make sense overall that this works the way it does, and we stand to gain a lot of benefit from changing it.

This current behavior also really hurts resource utilization when running a scaling airbyte instance, because you can't ever reliably predict how much memory a worker will need.

Describe the solution you’d like

I want the destination connectors to be able to work with a preset amount of memory e.g: I want to decide that if I have only 1.5GB to give the connector, then it should function correctly with that, maybe with a tradeoff of speed. But at least it shouldn't fail completely. Only hold space for a buffer if it is being used. We can even lazy-allocate space as records come in and release that space when the buffer is flushed, then allocate it again if new records come in for that buffer.

The most important destinations to do this for, in order, are:

  • Snowflake Internal staging
  • S3 (because it is relied upon by GCS connector and therefore BigQuery connector. Also Redshift and Snowflake connectors, if the latter is using external staging to S3)

Remaining pieces

Following the work on the first destination in Snowflake and S3, an abstraction was made to serialize records and compress them before buffering while monitoring for reasonable memory/file handlers limits.

This mechanism can now be replicated and deployed on all other java destinations by inheriting the same base classes while providing the proper BlobStorageOperations or SqlOperations to be interacted with.

The benefits can be shared for any destination that needs to buffer a batch of records before sending them instead of writing one by one.

Warehouse & blob storage destinations

It is a standard Warehouse best practice to use some staging areas where data is first buffered before being loaded to the final destinations. By nature, blob storage also follows the same requirement since they manipulate files.

Other destinations requiring buffering

The destinations in this section seem to already be using or needing some buffering mechanisms. These may be implemented on a case-by-case basis. Thus, refactoring them to follow the same pattern will reduce the maintenance cost of the connectors and unify the buffering options while getting memory consumptions under control in a common and more predictable manner:

  • DynamoDB destination
  • ElasticSearch destination
  • MeiliSearch destination
  • Pulsar destination
  • RockSet destination

Other destination that may benefit from buffering

The destination connectors below are sending records to the destination as they flow in.
So, they may not have memory consumptions issues with large streams, but it might be useful to verify that batching in a buffer before sending a larger collection of records together may be beneficial:

  • JDBC destination (these usually have some batch loading options and it's probably a good idea to use them too)
    • Clickhouse destination
    • MariaDbColumnStore destination
    • MsSql destination
    • MySql destination
    • Oracle destination
    • Postgres destination: use of COPY bulk loading is recommended
  • MongoDb destination
  • MQTT destination

(* this may not be a completely exhaustive list, since we (airbyte or community) regularly add more destination connectors, so some destinations may not be listed in here, they may need to be added to the correct section or disregarded because buffering with that particular connector does not represent a benefit/issue)
(* Python connectors are not listed in this issue)

Related areas

These are related areas that don't do so well with large batch or an arbitrary number of streams:

┆Issue is synchronized with this Asana task by Unito

@tuliren
Copy link
Contributor

tuliren commented Feb 16, 2022

@tuliren
Copy link
Contributor

tuliren commented Feb 16, 2022

The snowflake internal staging has stable memory usage for multiple streams.

One stream with tons of large records:

Screen Shot 2022-02-16 at 15 03 33

10 streams with smaller load:
Screen Shot 2022-02-16 at 15 04 11

I still need to figure out why it needs so much memory initially.

@tuliren
Copy link
Contributor

tuliren commented Feb 17, 2022

Memory Usage from Byte Arrays

One major contributor to the high memory usage is ByteUtils#getSizeInBytesForUTF8CharSet. This method creates a byte array from a string just to get a precise byte size. Along the way, it creates lots of byte arrays.

In practice, we only need an estimation of the string size. Since UTF-8 character is 1-4 bytes length, we can just use the string length times 4 to approximate the byte size. This will be an upper bound, which is even safer.

Memory usage with getSizeInBytesForUTF8CharSet

Screen Shot 2022-02-16 at 21 29 41

Screen Shot 2022-02-16 at 21 30 19

We can see that:

  • The max heap size is over 1.2 GB.
  • Byte arrays take up 370 MB memory.

Memory usage with simple estimation (string length x 4)

Screen Shot 2022-02-16 at 21 41 18

Screen Shot 2022-02-16 at 21 40 58

The differences are:

  • Max heap size is 550 MB, as compared to 1.2+ GB.
  • Byte arrays take up only 125 MB.

Conclusion

Removing the getSizeInBytesForUTF8CharSet method calls immediately halves the memory usage.

@tuliren
Copy link
Contributor

tuliren commented Feb 17, 2022

  • The next idea is to further rely on estimation instead of precise calculation of the record message sizes.
  • The proposal is that rather than serializing every record message, we can sample 1/10 of them and use the average size.
  • The assumption behind this proposal is that each record message has roughly the same size.

@tuliren
Copy link
Contributor

tuliren commented Feb 17, 2022

Sample the string size for every N records turn out to not have that big an effect.

The heap size is kept at 500 MB.

Byte size (OOME with 500 MB heap size)

byte size every record oome

Calculating byte size for every record message results in OOME within one minute. This result proves that the frequent byte conversion is the root cause of the large memory footprint.

String size (sample every record)

string size every record

String size (sample every 20 records)

string size every 20 records

String size (sample every 100 records)

string size every 100 records

Performing one sample every 100 records slightly reduces memory usage. There are fewer number of memory usage peaks in the above chart.

@tuliren
Copy link
Contributor

tuliren commented Feb 17, 2022

When setting the max heap size to 300 MB, the sampling rate has visible impact on heap usage. As the sampling rate decreases, there are fewer heap peaks, especially when the rate is reduced from sampling every record to every 20 records. The impact is not that significant when the rate changes from every 20 to every 100 records.

String size (every record)

300mb string size every record

String size (every 20 records)

300mb string size every 20 records

String size (every 100 records)

300mb string size every 100 records

Conclusion

Will sample the string size every 20 records.

@tuliren
Copy link
Contributor

tuliren commented Feb 17, 2022

300 MB seems like the minimum heap size. When setting a lower xmx, the connector went OOME within one minute:

  • 200 MB max heap size:
    Screen Shot 2022-02-16 at 23 27 31
  • 256 MB max heap size:
    Screen Shot 2022-02-16 at 23 30 18

@tuliren
Copy link
Contributor

tuliren commented Feb 23, 2022

The S3 staging mode is not scalable. When syncing 50 streams, each with 1K records, the connector will encounter OOME with 500 MB max heap size.

Something is clearly wrong that the number of live threads just keeps going up. The majority of the heap is occupied by byte arrays. This mode requires more investigation.

Screen Shot 2022-02-23 at 07 20 34

Screen Shot 2022-02-23 at 07 20 41

Screen Shot 2022-02-23 at 07 20 50

It does work with fewer streams (e.g. 3 streams, each with 10K records).

@tuliren
Copy link
Contributor

tuliren commented Feb 23, 2022

Summary

  • ETA for S3 staging mode investigation and optimization
    • End of next sprint (Feb 29th)
  • Remaining work
    • Add Sentry tracing for copy destination to facilitate investigation
    • Analyze the concurrent logic

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Mar 21, 2022

Following up on @tuliren's investigation:

The S3 staging mode is not scalable. When syncing 50 streams, each with 1K records, the connector will encounter OOME with 500 MB max heap size.
Something is clearly wrong that the number of live threads just keeps going up. The majority of the heap is occupied by byte arrays. This mode requires more investigation.

PR #10866 solves the non-scalable aspects that were discovered so far with the snowflake S3 staging mode.

So this statement is (partially?) not true anymore with the new implementation (destination-snowflake 0.4.21):

300 MB seems like the minimum heap size. When setting a lower xmx, the connector went OOME within one minute:

The new approach implements a compressed serialized buffer strategy with limits of memory per stream, memory globally, and # of simultaneous buffers. Since records are now compressed, estimating their size beforehand is now more complex. So, we count bytes being buffered only after actually writing it instead. Thus, the following issue does not concern us anymore:

One major contributor to the high memory usage is ByteUtils#getSizeInBytesForUTF8CharSet. This method creates a byte array from a string just to get a precise byte size. Along the way, it creates lots of byte arrays.
In practice, we only need an estimation of the string size. Since UTF-8 character is 1-4 bytes length, we can just use the string length times 4 to approximate the byte size.

Below numbers are run with the new S3 staging mode, syncing 200 streams, each with 2K records (representing 4.24GB of data in total) running with 200MB max heap size.

Screenshot 2022-03-21 at 10 25 13
Screenshot 2022-03-21 at 10 25 22

S3-staging-200_streams_2000_records_1024mb_gzip_disk_buffer.log

This shows a more stable and reduced consumption of memory, thread (and CPU) resources regardless of the sync size or of the number of streams.

This also shows room for improvement in using more CPU resources to optimize speed performances in the future.

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Mar 21, 2022

PR #10866 solves the non-scalable aspects that were discovered so far with the S3 staging mode. Below numbers are run with the new S3 staging mode, syncing 200 streams, each with 2K records (representing 4.24GB of data in total) running with 200MB max heap size.

However, these are not the optimal / recommended max heap sizes to run the new staging modes with. (this comment applies to modes using the StreamTransferManager)

When the same connector is run with the same configuration, but with a larger amount of data per stream, if we keep the max heap size of 200MB, then we encounter OOME...

For example, let's study a scenario where N stream with 100K records is representing 750-800MB of uncompressed data per stream. When compressed, it is about 283MB compressed (gzip) of data per stream.

(For comparison, the previous sync with 200 streams was manipulating only 5MB compressed data per stream)

Even though the data is now being buffered and compressed on-disk per stream, when a flush event occurs for a particular stream, the compressed data for that stream is fully loaded into memory (with extra memory overhead as described here).

Note that the connector is capable to buffer multiple streams at once in different file buffers on disk (with some known limitations such as https://github.com/airbytehq/airbyte-internal-issues/issues/496), but it will load at most one compressed file of max 200MB into memory at a time when triggered for uploading to staging (which represents at most 300-400MB in memory usage when using alex.mojaki.s3upload.StreamTransferManager class).

Memory consumption when buffering/processing the first 200K records on 2 stream:

Screenshot 2022-03-21 at 10 10 40

Memory consumption in a "stable" state when reaching 10+GB of data transferred (1000+K records on 10+ streams)

(with these sync settings, 200MB compressed data contains about 70K records, there would be 30K records left which is about 83MB compressed that are remaining in buffers after a flush for a stream, once we reach 10 streams in the buffers, a flush all event is triggered below, so these smaller files are uploaded during this phase, thus requiring less memory to load them for the uploading phase)

Screenshot 2022-03-21 at 11 18 45

The total run time for 15.59GB over 15 streams with 100000 records per stream was 2 hrs 11 min 08 sec:
S3-staging-15_streams_100000_records_1024mb_gzip_disk_buffer.log

In conclusion, for the moment, it is better to run with at least somewhere around 600MB max heap size + 1GB disk storage

These recommended limits are tied to the following settings:

  • with the current implementation of the buffering strategy (destination-snowflake 0.4.21) for the S3 staging mode
  • with 200MB maximum of compressed data in the buffer per stream
  • with max 10 concurrent buffers
  • while the maximum on-disk storage used by the buffering is currently set to 1GB
    • maybe it would make sense to change this to be 10 concurrent buffers * 200 MB per buffer = 2GB instead though.
    • maybe it is worth setting the authorized disk space for the worker slightly above this limit though (just in case, the connector needs it for other than just buffering)

These limits are configured here and see comment for more details

In future versions, if the uploading to the stage area implementation would rely on multi-threading multiple StreamTransferManager in parallel, the connector may need further memory accordingly to process such simultaneous upload tasks.

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Mar 21, 2022

Destination snowflake with Internal Staging mode does not have the same memory requirement as the S3 staging mode

Because it is not relying on the alex.mojaki.s3upload.StreamTransferManager class. So it does not actually load the buffered file into memory in order to upload it to staging.

Scenario 1 of 4.24GB over 200 streams and 2K records (5MB compressed) per stream

Screenshot 2022-03-21 at 16 09 52

To compare against S3 staging mode

Scenario 2 of 15.59GB over 15 streams and 100K records (283MB compressed) per stream

Memory consumption when buffering/processing the first 200K records on 2 stream:

Screenshot 2022-03-21 at 15 59 17

To compare against S3 staging mode

Memory consumption in a "stable" state when reaching 10+GB of data transferred (1000+K records on 10+ streams)

Screenshot 2022-03-21 at 17 07 19

To compare against S3 staging mode

The total run time for 15.59GB over 15 streams with 100000 records per stream was 2 hrs 20 min 09 sec

internal-staging-15_streams_100000_records_1024mb_gzip_disk_buffer.log

Recap

Criteria Baseline (/dev/null) Snowflake external S3 Staging Snowflake internal Staging
Time to sync 200 streams of 2000 records each 29 min 54 min 1 hour 27 min
Time to sync 15 streams of 100000 records each 1 hour 55 min 2 hour 11 min 2 hour 20 min
Recommended max heap size 600 MB 200 MB
Recommended disk storage size 1+ GB 1+ GB

(* FYI these timings were collected over a single run, we should probably do multiple runs and average them to have a more accurate view)

@olivermeyer
Copy link
Contributor

olivermeyer commented Mar 31, 2022

We're getting an OOM in the Redshift destination when trying to import a relatively large dataset (~6m rows, so nothing too wild). The logs are

2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:81)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at alex.mojaki.s3upload.ConvertibleOutputStream.<init>(ConvertibleOutputStream.java:20)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at alex.mojaki.s3upload.MultiPartOutputStream.<init>(MultiPartOutputStream.java:74)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at alex.mojaki.s3upload.StreamTransferManager.getMultiPartOutputStreams(StreamTransferManager.java:338)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.prepareStagingFile(S3StreamCopier.java:123)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$recordWriterFunction$0(CopyConsumerFactory.java:90)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory$$Lambda$178/0x0000000800e10c40.accept(Unknown Source)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.flushQueueToDestination(BufferedStreamConsumer.java:166)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:148)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:46)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:147)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.redshift.RedshiftDestination.main(RedshiftDestination.java:78)

I also see the memory usage by that pod hit the node's capacity just before it dies. Seems like it's related to this general issue, specifically this one. Any ETA for the Redshift destination fix? And any possible workarounds in the meantime?

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Mar 31, 2022

a relatively large dataset (~6m rows, so nothing too wild).
I also see the memory usage by that pod hit the node's capacity just before it dies. Seems like it's related to this general issue,

How are the 6M rows split on how many tables?
If it's coming from a large number of tables. then yes it might be related to what is being targeted by this epic issue and the one you linked.

A workaround would be to split your connection and select only a few tables at once because the connector is allocating a large buffer for each stream all at once.

Having a large number of rows on one single table should not run into OOM though (unless due to another bug)

How much memory do you allocate to the worker node running the connector?

For ETA, I don't know. maybe @grishick can answer that part. (but I believe we might address those issues for the warehouse destinations in the near future anyway)

@olivermeyer
Copy link
Contributor

olivermeyer commented Apr 1, 2022

How are the 6M rows split on how many tables?

It's actually just one table, so perhaps it's another bug. Perhaps I should open a new issue?

@ChristopheDuong
Copy link
Contributor

How are the 6M rows split on how many tables?

It's actually just one table, so perhaps it's another bug. Perhaps I should open a new issue?

Yes, please!

  • Can you specify how much memory do you allocate to the worker node running the connector?
  • Can you try the destination-s3 connector to see if the sync fails there too? (if that works, then while waiting on a fix on destination-redshift, you could then manually load the data from s3 to redshift)

@sherifnada
Copy link
Contributor Author

It could be a wide row issue potentially

@octavia-squidington-iii
Copy link
Collaborator

At Airbyte, we seek to be clear about the project priorities and roadmap. This issue has not had any activity for 365 days, suggesting that it's not as critical as others. It's possible it has already been fixed. It is being marked as stale and will be closed in 20 days if there is no activity. To keep it open, please comment to let us know why it is important to you and if it is still reproducible on recent versions of Airbyte.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants