Skip to content

[EPIC] scale warehouse destination connectors to handle arbitrary number of streams #10260

Open
@sherifnada

Description

@sherifnada

Tell us about the problem you're trying to solve

Today destination warehouses or blob storages like S3 hold a constant-size, in-memory buffer for each stream it expects to see from the input catalog. For example, this comment in the S3 code illustrates that the more streams we have coming in, the more memory the connector must hold for buffers.

This has two obvious problems. First is that it does not scale. Second, it's unnecessary because we often write files whose max size is no more than 1GB (because it's optimal for consuming warehouses). So, it doesn't really make sense overall that this works the way it does, and we stand to gain a lot of benefit from changing it.

This current behavior also really hurts resource utilization when running a scaling airbyte instance, because you can't ever reliably predict how much memory a worker will need.

Describe the solution you’d like

I want the destination connectors to be able to work with a preset amount of memory e.g: I want to decide that if I have only 1.5GB to give the connector, then it should function correctly with that, maybe with a tradeoff of speed. But at least it shouldn't fail completely. Only hold space for a buffer if it is being used. We can even lazy-allocate space as records come in and release that space when the buffer is flushed, then allocate it again if new records come in for that buffer.

The most important destinations to do this for, in order, are:

  • Snowflake Internal staging
  • S3 (because it is relied upon by GCS connector and therefore BigQuery connector. Also Redshift and Snowflake connectors, if the latter is using external staging to S3)

Remaining pieces

Following the work on the first destination in Snowflake and S3, an abstraction was made to serialize records and compress them before buffering while monitoring for reasonable memory/file handlers limits.

This mechanism can now be replicated and deployed on all other java destinations by inheriting the same base classes while providing the proper BlobStorageOperations or SqlOperations to be interacted with.

The benefits can be shared for any destination that needs to buffer a batch of records before sending them instead of writing one by one.

Warehouse & blob storage destinations

It is a standard Warehouse best practice to use some staging areas where data is first buffered before being loaded to the final destinations. By nature, blob storage also follows the same requirement since they manipulate files.

Other destinations requiring buffering

The destinations in this section seem to already be using or needing some buffering mechanisms. These may be implemented on a case-by-case basis. Thus, refactoring them to follow the same pattern will reduce the maintenance cost of the connectors and unify the buffering options while getting memory consumptions under control in a common and more predictable manner:

  • DynamoDB destination
  • ElasticSearch destination
  • MeiliSearch destination
  • Pulsar destination
  • RockSet destination

Other destination that may benefit from buffering

The destination connectors below are sending records to the destination as they flow in.
So, they may not have memory consumptions issues with large streams, but it might be useful to verify that batching in a buffer before sending a larger collection of records together may be beneficial:

  • JDBC destination (these usually have some batch loading options and it's probably a good idea to use them too)
    • Clickhouse destination
    • MariaDbColumnStore destination
    • MsSql destination
    • MySql destination
    • Oracle destination
    • Postgres destination: use of COPY bulk loading is recommended
  • MongoDb destination
  • MQTT destination

(* this may not be a completely exhaustive list, since we (airbyte or community) regularly add more destination connectors, so some destinations may not be listed in here, they may need to be added to the correct section or disregarded because buffering with that particular connector does not represent a benefit/issue)
(* Python connectors are not listed in this issue)

Related areas

These are related areas that don't do so well with large batch or an arbitrary number of streams:

┆Issue is synchronized with this Asana task by Unito

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions