Description
Environment
- Airbyte version: 0.35.0-alpha
- OS Version / Instance: EKS, c5.2xlarge node (16 GB memory)
- Deployment: Kubernetes
- Source Connector and version: Salesforce 0.2.5
- Destination Connector and version: Redshift 0.3.28
- Severity: Medium (?)
- Step where error happened: Sync job
Current Behavior
I am trying to sync a single stream (AccountHistory
) from Salesforce to Redshift. The sync fails with an OOM error after reading more than 13213000 records. I also see the memory usage by the pod increase to ~12GB before it dies, which is in line with the memory available on the node on which it's running.
For reference, I am able to sync other streams from Salesforce to Redshift without a problem, but these are considerably smaller (a few thousand records since they're incremental). I am also able to sync the same stream but with a much smaller dataset by changing the start_date
in the source connector. This points to the volume of data being the issue. The simplest explanation is that for some reason the Redshift destination is keeping the entire dataset in memory, but I'm not sure if that's a reasonable explanation or why that would be the case.
Expected Behavior
The sync should succeed 🙂
Logs
2022-03-31 13:09:17 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-31 13:09:17 �[1;31mERROR�[m a.m.s.StreamTransferManager(abort):432 - {} - Aborting [Manager uploading to <REDACTED>/airbyte/c5c0046f-1ed2-406a-84de-8c54f4f05f1b/<REDACTED>/659_bgrbl_salesforce__account_history_testAccountHistory with id SNGdAIa06....vr0MIJDY] due to error: java.lang.OutOfMemoryError: Java heap space
And a bit further down:
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-31 13:10:58 �[1;31mERROR�[m i.a.i.d.b.BufferedStreamConsumer(close):212 - {} - Close failed.
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - java.lang.NullPointerException: null
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at alex.mojaki.s3upload.StreamTransferManager.complete(StreamTransferManager.java:364) ~[s3-stream-upload-2.2.2.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.closeAndWaitForUpload(S3StreamCopier.java:230) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.closeStagingUploader(S3StreamCopier.java:152) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.closeAsOneTransaction(CopyConsumerFactory.java:122) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$onCloseFunction$2(CopyConsumerFactory.java:111) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction.accept(OnCloseFunction.java:9) ~[io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:203) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:62) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:141) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128) [io.airbyte.airbyte-integrations.bases-base-java-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[32mINFO�[m i.a.w.p.a.DefaultAirbyteStreamFactory(lambda$create$0):61 - at io.airbyte.integrations.destination.redshift.RedshiftDestination.main(RedshiftDestination.java:78) [io.airbyte.airbyte-integrations.connectors-destination-redshift-0.30.20-alpha.jar:?]
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:81)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at alex.mojaki.s3upload.ConvertibleOutputStream.<init>(ConvertibleOutputStream.java:20)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at alex.mojaki.s3upload.MultiPartOutputStream.<init>(MultiPartOutputStream.java:74)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at alex.mojaki.s3upload.StreamTransferManager.getMultiPartOutputStreams(StreamTransferManager.java:338)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.prepareStagingFile(S3StreamCopier.java:123)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$recordWriterFunction$0(CopyConsumerFactory.java:90)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory$$Lambda$178/0x0000000800e10c40.accept(Unknown Source)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.flushQueueToDestination(BufferedStreamConsumer.java:166)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:148)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:46)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:147)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128)
2022-03-31 13:10:58 �[1;31mERROR�[m i.a.c.i.LineGobbler(voidCall):82 - at io.airbyte.integrations.destination.redshift.RedshiftDestination.main(RedshiftDestination.java:78)
2022-03-31 13:11:28 �[32mINFO�[m i.a.w.p.KubePodProcess(getReturnCode):634 - Exit code for pod destination-redshift-sync-217-2-vethd is 1
Steps to Reproduce
- Set up a connection between Salesforce and Redshift, selecting the
AccountHistory
stream - Trigger the sync
Are you willing to submit a PR?
No time right now