-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[EPIC] scale warehouse destination connectors to handle arbitrary number of streams #10260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
|
Memory Usage from Byte ArraysOne major contributor to the high memory usage is In practice, we only need an estimation of the string size. Since UTF-8 character is 1-4 bytes length, we can just use the string length times 4 to approximate the byte size. This will be an upper bound, which is even safer. Memory usage with
|
|
Sample the string size for every N records turn out to not have that big an effect. The heap size is kept at 500 MB. Byte size (OOME with 500 MB heap size)Calculating byte size for every record message results in OOME within one minute. This result proves that the frequent byte conversion is the root cause of the large memory footprint. String size (sample every record)String size (sample every 20 records)String size (sample every 100 records)Performing one sample every 100 records slightly reduces memory usage. There are fewer number of memory usage peaks in the above chart. |
When setting the max heap size to 300 MB, the sampling rate has visible impact on heap usage. As the sampling rate decreases, there are fewer heap peaks, especially when the rate is reduced from sampling every record to every 20 records. The impact is not that significant when the rate changes from every 20 to every 100 records. String size (every record)String size (every 20 records)String size (every 100 records)ConclusionWill sample the string size every 20 records. |
The S3 staging mode is not scalable. When syncing 50 streams, each with 1K records, the connector will encounter OOME with 500 MB max heap size. Something is clearly wrong that the number of live threads just keeps going up. The majority of the heap is occupied by byte arrays. This mode requires more investigation. It does work with fewer streams (e.g. 3 streams, each with 10K records). |
Summary
|
Following up on @tuliren's investigation:
PR #10866 solves the non-scalable aspects that were discovered so far with the snowflake S3 staging mode.So this statement is (partially?) not true anymore with the new implementation (destination-snowflake 0.4.21):
The new approach implements a compressed serialized buffer strategy with limits of memory per stream, memory globally, and # of simultaneous buffers. Since records are now compressed, estimating their size beforehand is now more complex. So, we count bytes being buffered only after actually writing it instead. Thus, the following issue does not concern us anymore:
Below numbers are run with the new S3 staging mode, syncing 200 streams, each with 2K records (representing 4.24GB of data in total) running with 200MB max heap size. S3-staging-200_streams_2000_records_1024mb_gzip_disk_buffer.log This shows a more stable and reduced consumption of memory, thread (and CPU) resources regardless of the sync size or of the number of streams.This also shows room for improvement in using more CPU resources to optimize speed performances in the future. |
However, these are not the optimal / recommended max heap sizes to run the new staging modes with. (this comment applies to modes using the When the same connector is run with the same configuration, but with a larger amount of data per stream, if we keep the max heap size of 200MB, then we encounter OOME... For example, let's study a scenario where N stream with 100K records is representing 750-800MB of uncompressed data per stream. When compressed, it is about 283MB compressed (gzip) of data per stream. (For comparison, the previous sync with 200 streams was manipulating only 5MB compressed data per stream) Even though the data is now being buffered and compressed on-disk per stream, when a flush event occurs for a particular stream, the compressed data for that stream is fully loaded into memory (with extra memory overhead as described here). Note that the connector is capable to buffer multiple streams at once in different file buffers on disk (with some known limitations such as https://github.com/airbytehq/airbyte-internal-issues/issues/496), but it will load at most one compressed file of max 200MB into memory at a time when triggered for uploading to staging (which represents at most 300-400MB in memory usage when using Memory consumption when buffering/processing the first 200K records on 2 stream:Memory consumption in a "stable" state when reaching 10+GB of data transferred (1000+K records on 10+ streams)(with these sync settings, 200MB compressed data contains about 70K records, there would be 30K records left which is about 83MB compressed that are remaining in buffers after a flush for a stream, once we reach 10 streams in the buffers, a flush all event is triggered below, so these smaller files are uploaded during this phase, thus requiring less memory to load them for the uploading phase) The total run time for 15.59GB over 15 streams with 100000 records per stream was 2 hrs 11 min 08 sec: In conclusion, for the moment, it is better to run with at least somewhere around 600MB max heap size + 1GB disk storageThese recommended limits are tied to the following settings:
These limits are configured here and see comment for more details In future versions, if the uploading to the stage area implementation would rely on multi-threading multiple |
Destination snowflake with Internal Staging mode does not have the same memory requirement as the S3 staging modeBecause it is not relying on the Scenario 1 of 4.24GB over 200 streams and 2K records (5MB compressed) per streamTo compare against S3 staging mode Scenario 2 of 15.59GB over 15 streams and 100K records (283MB compressed) per streamMemory consumption when buffering/processing the first 200K records on 2 stream:To compare against S3 staging mode Memory consumption in a "stable" state when reaching 10+GB of data transferred (1000+K records on 10+ streams)To compare against S3 staging mode The total run time for 15.59GB over 15 streams with 100000 records per stream was 2 hrs 20 min 09 sec internal-staging-15_streams_100000_records_1024mb_gzip_disk_buffer.log Recap
(* FYI these timings were collected over a single run, we should probably do multiple runs and average them to have a more accurate view) |
We're getting an OOM in the Redshift destination when trying to import a relatively large dataset (~6m rows, so nothing too wild). The logs are
I also see the memory usage by that pod hit the node's capacity just before it dies. Seems like it's related to this general issue, specifically this one. Any ETA for the Redshift destination fix? And any possible workarounds in the meantime? |
How are the 6M rows split on how many tables? A workaround would be to split your connection and select only a few tables at once because the connector is allocating a large buffer for each stream all at once. Having a large number of rows on one single table should not run into OOM though (unless due to another bug) How much memory do you allocate to the worker node running the connector? For ETA, I don't know. maybe @grishick can answer that part. (but I believe we might address those issues for the warehouse destinations in the near future anyway) |
It's actually just one table, so perhaps it's another bug. Perhaps I should open a new issue? |
Yes, please!
|
It could be a wide row issue potentially |
At Airbyte, we seek to be clear about the project priorities and roadmap. This issue has not had any activity for 365 days, suggesting that it's not as critical as others. It's possible it has already been fixed. It is being marked as stale and will be closed in 20 days if there is no activity. To keep it open, please comment to let us know why it is important to you and if it is still reproducible on recent versions of Airbyte. |
Tell us about the problem you're trying to solve
Today destination warehouses or blob storages like S3 hold a constant-size, in-memory buffer for each stream it expects to see from the input catalog. For example, this comment in the S3 code illustrates that the more streams we have coming in, the more memory the connector must hold for buffers.
This has two obvious problems. First is that it does not scale. Second, it's unnecessary because we often write files whose max size is no more than 1GB (because it's optimal for consuming warehouses). So, it doesn't really make sense overall that this works the way it does, and we stand to gain a lot of benefit from changing it.
This current behavior also really hurts resource utilization when running a scaling airbyte instance, because you can't ever reliably predict how much memory a worker will need.
Describe the solution you’d like
I want the destination connectors to be able to work with a preset amount of memory e.g: I want to decide that if I have only 1.5GB to give the connector, then it should function correctly with that, maybe with a tradeoff of speed. But at least it shouldn't fail completely. Only hold space for a buffer if it is being used. We can even lazy-allocate space as records come in and release that space when the buffer is flushed, then allocate it again if new records come in for that buffer.
The most important destinations to do this for, in order, are:
Remaining pieces
Following the work on the first destination in Snowflake and S3, an abstraction was made to serialize records and compress them before buffering while monitoring for reasonable memory/file handlers limits.
This mechanism can now be replicated and deployed on all other java destinations by inheriting the same base classes while providing the proper
BlobStorageOperations
orSqlOperations
to be interacted with.The benefits can be shared for any destination that needs to buffer a batch of records before sending them instead of writing one by one.
Warehouse & blob storage destinations
It is a standard Warehouse best practice to use some staging areas where data is first buffered before being loaded to the final destinations. By nature, blob storage also follows the same requirement since they manipulate files.
Other destinations requiring buffering
The destinations in this section seem to already be using or needing some buffering mechanisms. These may be implemented on a case-by-case basis. Thus, refactoring them to follow the same pattern will reduce the maintenance cost of the connectors and unify the buffering options while getting memory consumptions under control in a common and more predictable manner:
Other destination that may benefit from buffering
The destination connectors below are sending records to the destination as they flow in.
So, they may not have memory consumptions issues with large streams, but it might be useful to verify that batching in a buffer before sending a larger collection of records together may be beneficial:
COPY
bulk loading is recommended(* this may not be a completely exhaustive list, since we (airbyte or community) regularly add more destination connectors, so some destinations may not be listed in here, they may need to be added to the correct section or disregarded because buffering with that particular connector does not represent a benefit/issue)
(* Python connectors are not listed in this issue)
Related areas
These are related areas that don't do so well with large batch or an arbitrary number of streams:
┆Issue is synchronized with this Asana task by Unito
The text was updated successfully, but these errors were encountered: