Description
Tell us about the problem you're trying to solve
Today destination warehouses or blob storages like S3 hold a constant-size, in-memory buffer for each stream it expects to see from the input catalog. For example, this comment in the S3 code illustrates that the more streams we have coming in, the more memory the connector must hold for buffers.
This has two obvious problems. First is that it does not scale. Second, it's unnecessary because we often write files whose max size is no more than 1GB (because it's optimal for consuming warehouses). So, it doesn't really make sense overall that this works the way it does, and we stand to gain a lot of benefit from changing it.
This current behavior also really hurts resource utilization when running a scaling airbyte instance, because you can't ever reliably predict how much memory a worker will need.
Describe the solution you’d like
I want the destination connectors to be able to work with a preset amount of memory e.g: I want to decide that if I have only 1.5GB to give the connector, then it should function correctly with that, maybe with a tradeoff of speed. But at least it shouldn't fail completely. Only hold space for a buffer if it is being used. We can even lazy-allocate space as records come in and release that space when the buffer is flushed, then allocate it again if new records come in for that buffer.
The most important destinations to do this for, in order, are:
- Snowflake Internal staging
- S3 (because it is relied upon by GCS connector and therefore BigQuery connector. Also Redshift and Snowflake connectors, if the latter is using external staging to S3)
Remaining pieces
Following the work on the first destination in Snowflake and S3, an abstraction was made to serialize records and compress them before buffering while monitoring for reasonable memory/file handlers limits.
This mechanism can now be replicated and deployed on all other java destinations by inheriting the same base classes while providing the proper BlobStorageOperations
or SqlOperations
to be interacted with.
The benefits can be shared for any destination that needs to buffer a batch of records before sending them instead of writing one by one.
Warehouse & blob storage destinations
It is a standard Warehouse best practice to use some staging areas where data is first buffered before being loaded to the final destinations. By nature, blob storage also follows the same requirement since they manipulate files.
- Snowflake destination
- Internal Staging & S3 external staging: 🎉 Change destination-snowflake buffering when staging to reduce/stabilize memory/thread consumption #10866
- GCS external Staging: Apply buffering changes to Snowflake Destination using GCS staging #11428
- Azure external Staging: airbytehq/airbyte-internal-issues#501
- S3 destination: Scale S3 connector to handle arbitrary number of streams per sync #10591 / 🎉 Change destination-s3 buffering to reduce/stabilize memory/thread consumption #11294
- BigQuery destination: Apply buffering changes to BigQuery Destination when using staging #11203
- BigQuery-denormalized
- Redshift destination Apply buffering changes to Redshift Destination #11426
- Azure blob storage destination: airbytehq/airbyte-internal-issues#500
- Databricks destination Apply buffering changes to Databricks Destination #11424
- GCS destination Apply buffering changes to GCS Destination #11425
Other destinations requiring buffering
The destinations in this section seem to already be using or needing some buffering mechanisms. These may be implemented on a case-by-case basis. Thus, refactoring them to follow the same pattern will reduce the maintenance cost of the connectors and unify the buffering options while getting memory consumptions under control in a common and more predictable manner:
- DynamoDB destination
- ElasticSearch destination
- MeiliSearch destination
- Pulsar destination
- RockSet destination
Other destination that may benefit from buffering
The destination connectors below are sending records to the destination as they flow in.
So, they may not have memory consumptions issues with large streams, but it might be useful to verify that batching in a buffer before sending a larger collection of records together may be beneficial:
- JDBC destination (these usually have some batch loading options and it's probably a good idea to use them too)
- Clickhouse destination
- MariaDbColumnStore destination
- MsSql destination
- MySql destination
- Oracle destination
- Postgres destination: use of
COPY
bulk loading is recommended
- MongoDb destination
- MQTT destination
(* this may not be a completely exhaustive list, since we (airbyte or community) regularly add more destination connectors, so some destinations may not be listed in here, they may need to be added to the correct section or disregarded because buffering with that particular connector does not represent a benefit/issue)
(* Python connectors are not listed in this issue)
Related areas
These are related areas that don't do so well with large batch or an arbitrary number of streams:
- Normalization is being used by most of the data warehouse destinations in Airbyte: https://github.com/airbytehq/airbyte-internal-issues/issues/474
- Sources: Reads (in both Source and Destination) should batch on bytes read instead of records read. #3439
- Source / discover: Discover schema fails when databases have too many tables. #3943
- UI: Schema page does not display more than 6k tables. #3942
- UX: Connector UX: deprecate PART_SIZE_MB in connectors specs using S3/GCS storage #11389
┆Issue is synchronized with this Asana task by Unito