Skip to content

Destination Redshift: throws OOM on large dataset #11695

Closed
@olivermeyer

Description

@olivermeyer

Environment

  • Airbyte version: 0.35.0-alpha
  • OS Version / Instance: EKS, c5.2xlarge node (16 GB memory)
  • Deployment: Kubernetes
  • Source Connector and version: Salesforce 0.2.5
  • Destination Connector and version: Redshift 0.3.28
  • Severity: Medium (?)
  • Step where error happened: Sync job

Current Behavior

I am trying to sync a single stream (AccountHistory) from Salesforce to Redshift. The sync fails with an OOM error after reading more than 13213000 records. I also see the memory usage by the pod increase to ~12GB before it dies, which is in line with the memory available on the node on which it's running.

For reference, I am able to sync other streams from Salesforce to Redshift without a problem, but these are considerably smaller (a few thousand records since they're incremental). I am also able to sync the same stream but with a much smaller dataset by changing the start_date in the source connector. This points to the volume of data being the issue. The simplest explanation is that for some reason the Redshift destination is keeping the entire dataset in memory, but I'm not sure if that's a reasonable explanation or why that would be the case.

Expected Behavior

The sync should succeed 🙂

Logs

2022-03-31 13:09:17 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-31 13:09:17 �[1;31mERROR�[m a.m.s.StreamTransferManager(abort):432 - {} - Aborting [Manager uploading to <REDACTED>/airbyte/c5c0046f-1ed2-406a-84de-8c54f4f05f1b/<REDACTED>/659_bgrbl_salesforce__account_history_testAccountHistory with id SNGdAIa06....vr0MIJDY] due to error: java.lang.OutOfMemoryError: Java heap space

And a bit further down:

2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-31 13:10:58 �[1;31mERROR�[m i.a.i.d.b.BufferedStreamConsumer(close):212 - {} - Close failed.
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - java.lang.NullPointerException: null
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at alex.mojaki.s3upload.StreamTransferManager.complete(StreamTransferManager.java:364) ~[s3-stream-upload-2.2.2.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.closeAndWaitForUpload(S3StreamCopier.java:230) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.closeStagingUploader(S3StreamCopier.java:152) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.closeAsOneTransaction(CopyConsumerFactory.java:122) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$onCloseFunction$2(CopyConsumerFactory.java:111) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction.accept(OnCloseFunction.java:9) ~[io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:203) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:62) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:141) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.airbyte.integrations.destination.redshift.RedshiftDestination.main(RedshiftDestination.java:78) [io.airbyte.airbyte-integrations.connectors-destination-redshift-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:81)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at alex.mojaki.s3upload.ConvertibleOutputStream.<init>(ConvertibleOutputStream.java:20)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at alex.mojaki.s3upload.MultiPartOutputStream.<init>(MultiPartOutputStream.java:74)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at alex.mojaki.s3upload.StreamTransferManager.getMultiPartOutputStreams(StreamTransferManager.java:338)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.prepareStagingFile(S3StreamCopier.java:123)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$recordWriterFunction$0(CopyConsumerFactory.java:90)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory$$Lambda$178/0x0000000800e10c40.accept(Unknown Source)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.flushQueueToDestination(BufferedStreamConsumer.java:166)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:148)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:46)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:147)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - 	at io.airbyte.integrations.destination.redshift.RedshiftDestination.main(RedshiftDestination.java:78)
2022-03-31 13:11:28 �[32mINFO�[m i.a.w.p.KubePodProcess(getReturnCode):634 - Exit code for pod destination-redshift-sync-217-2-vethd is 1

Steps to Reproduce

  1. Set up a connection between Salesforce and Redshift, selecting the AccountHistory stream
  2. Trigger the sync

Are you willing to submit a PR?

No time right now

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions