Skip to content

Can not sync large table to elasticsearch #11749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
imiskolee opened this issue Apr 6, 2022 · 6 comments
Closed

Can not sync large table to elasticsearch #11749

imiskolee opened this issue Apr 6, 2022 · 6 comments

Comments

@imiskolee
Copy link
Contributor

imiskolee commented Apr 6, 2022

Details

I trying to full sync a large MySQL table, over 3M records, it's fails, I thinking it's related to too many records(255MB, over 490K records) on single bulk insert, how can we keep small bulk size, like 10K.

crash log

2022-04-06 08:32:41 �[44msource�[0m > 2022-04-06 08:32:40 �[32mINFO�[m i.a.i.s.r.AbstractDbSource(lambda$createReadIterator$7):250 - Reading stream companies. Records read: 490000
2022-04-06 08:32:41 �[32mINFO�[m i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):301 - Records read: 490000 (255 MB)
2022-04-06 08:32:41 �[43mdestination�[0m > 2022-04-06 08:32:41 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(acceptTracked):153 - Flushing buffer...
2022-04-06 08:32:41 �[43mdestination�[0m > 2022-04-06 08:32:41 �[32mINFO�[m i.a.i.d.e.ElasticsearchAirbyteMessageConsumerFactory(lambda$recordWriterFunction$3):76 - writing 490149 records in bulk operation
2022-04-06 08:32:47 �[43mdestination�[0m > 2022-04-06 08:32:47 �[33mWARN�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):61 - Airbyte message consumer: failed.
2022-04-06 08:32:47 �[43mdestination�[0m > 2022-04-06 08:32:47 �[1;31mERROR�[m i.a.i.d.b.BufferedStreamConsumer(close):205 - executing on failed close procedure.
2022-04-06 08:33:48 �[33mWARN�[m i.a.c.i.LineGobbler(voidCall):86 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job.
2022-04-06 08:33:48 �[1;31mERROR�[m i.a.w.DefaultReplicationWorker(run):169 - Sync worker failed.
java.util.concurrent.ExecutionException: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:162) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:57) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]
	Suppressed: io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:126) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:57) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:833) [?:?]
	Suppressed: java.io.IOException: Stream closed
		at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:445) ~[?:?]
		at java.io.OutputStream.write(OutputStream.java:162) ~[?:?]
		at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
		at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) ~[?:?]
		at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) ~[?:?]
		at java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) ~[?:?]
		at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?]
		at java.io.BufferedWriter.flush(BufferedWriter.java:256) ~[?:?]
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.notifyEndOfStream(DefaultAirbyteDestination.java:98) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:111) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:126) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:57) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$6(DefaultReplicationWorker.java:354) ~[io.airbyte-airbyte-workers-0.35.62-alpha.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	... 1 more
2022-04-06 08:33:48 �[32mINFO�[m i.a.w.DefaultReplicationWorker(run):228 - sync summary: io.airbyte.config.ReplicationAttemptSummary@22e609c5[status=failed,recordsSynced=490842,bytesSynced=268825701,startTime=1649233876202,endTime=1649234028237,totalStats=io.airbyte.config.SyncStats@39a65c8a[recordsEmitted=490842,bytesEmitted=268825701,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@190f3e9b[streamName=companies,stats=io.airbyte.config.SyncStats@7e925ed2[recordsEmitted=490842,bytesEmitted=268825701,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]]
2022-04-06 08:33:48 �[32mINFO�[m i.a.w.DefaultReplicationWorker(run):250 - Source did not output any state messages
2022-04-06 08:33:48 �[33mWARN�[m i.a.w.DefaultReplicationWorker(run):258 - State capture: No new state, falling back on input state: io.airbyte.config.State@6c28d8e4[state={}]
2022-04-06 08:33:48 �[32mINFO�[m i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling...
2022-04-06 08:33:48 �[32mINFO�[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$1):147 - sync summary: io.airbyte.config.StandardSyncOutput@45abcf32[standardSyncSummary=io.airbyte.config.StandardSyncSummary@457b5fc2[status=failed,recordsSynced=490842,bytesSynced=268825701,startTime=1649233876202,endTime=1649234028237,totalStats=io.airbyte.config.SyncStats@39a65c8a[recordsEmitted=490842,bytesEmitted=268825701,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@190f3e9b[streamName=companies,stats=io.airbyte.config.SyncStats@7e925ed2[recordsEmitted=490842,bytesEmitted=268825701,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]],state=io.airbyte.config.State@6c28d8e4[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@c276db7[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@4566a299[stream=io.airbyte.protocol.models.AirbyteStream@5eaf2758[name=companies,jsonSchema={"type":"object","properties":{"id":{"type":"number"},"todos":{"type":"string"},"country":{"type":"string"},"remarks":{"type":"string"},"br_number":{"type":"string"},"created_at":{"type":"string"},"updated_at":{"type":"string"},"company_type":{"type":"string"},"fee_and_levy":{"type":"number"},"active_status":{"type":"string"},"created_by_id":{"type":"number"},"updated_by_id":{"type":"number"},"company_number":{"type":"string"},"date_of_expiry":{"type":"string"},"important_note":{"type":"string"},"last_update_by":{"type":"string"},"status_remarks":{"type":"string"},"business_status":{"type":"string"},"winding_up_mode":{"type":"string"},"company_category":{"type":"string"},"date_of_cessation":{"type":"string"},"certificate_number":{"type":"string"},"nature_of_business":{"type":"string"},"registered_address":{"type":"string"},"register_of_charges":{"type":"string"},"total_issued_shares":{"type":"number"},"date_of_commencement":{"type":"string"},"date_of_incorporation":{"type":"string"},"administrative_division":{"type":"string"},"business_branch_name_chinese":{"type":"string"},"business_branch_name_english":{"type":"string"},"name_of_business_corporation_chinese":{"type":"string"},"name_of_business_corporation_english":{"type":"string"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}],failures=[io.airbyte.config.FailureReason@4135e600[failureOrigin=destination,failureType=<null>,internalMessage=io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process message delivery failed,externalMessage=Something went wrong within the destination connector,metadata=io.airbyte.config.Metadata@76487144[additionalProperties={attemptNumber=0, jobId=467}],stacktrace=java.util.concurrent.CompletionException: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process message delivery failed
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process message delivery failed
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$5(DefaultReplicationWorker.java:296)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	... 3 more
Caused by: java.io.IOException: Broken pipe
	at java.base/java.io.FileOutputStream.writeBytes(Native Method)
	at java.base/java.io.FileOutputStream.write(FileOutputStream.java:349)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
	at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234)
	at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304)
	at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
	at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132)
	at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:205)
	at java.base/java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120)
	at java.base/java.io.BufferedWriter.write(BufferedWriter.java:233)
	at java.base/java.io.Writer.write(Writer.java:249)
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:90)
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$5(DefaultReplicationWorker.java:294)
	... 4 more
,retryable=<null>,timestamp=1649233968228], io.airbyte.config.FailureReason@64a677e[failureOrigin=destination,failureType=<null>,internalMessage=io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1,externalMessage=Something went wrong within the destination connector,metadata=io.airbyte.config.Metadata@21f81508[additionalProperties={attemptNumber=0, jobId=467}],stacktrace=java.util.concurrent.CompletionException: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$6(DefaultReplicationWorker.java:354)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	... 3 more
,retryable=<null>,timestamp=1649233968227]]]
2022-04-06 08:33:48 �[32mINFO�[m i.a.w.t.TemporalUtils(withBackgroundHeartbeat):235 - Stopping temporal heartbeating...
2022-04-06 08:33:48 �[32mINFO�[m i.a.c.p.ConfigRepository(updateConnectionState):731 - Updating connection 5e21529a-e255-4731-a633-988e3cad0b23 state: io.airbyte.config.State@13a66474[state={}]
@imiskolee
Copy link
Contributor Author

how can we check Destination process log?

@imiskolee
Copy link
Contributor Author

it's hard code 256M records now

@imiskolee
Copy link
Contributor Author

256MB limit is too high on a large table and low QPS service. it's means we need a lot memory resources for elastic search server.

@imiskolee
Copy link
Contributor Author

#11752

@marcosmarxm
Copy link
Member

Solved by #11752. @imiskolee can yo confirm after merging you're able to sync the tables?

@imiskolee
Copy link
Contributor Author

yes' its work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants