Skip to content

Postgres to MySQL fails if source has a column with json data #4583

Closed
@julianopiovezan

Description

@julianopiovezan

Enviroment

  • Airbyte version: 0.26.2-alpha
  • OS Version / Instance: Debian bullseye/sid, AWS EC2
  • Deployment: Docker
  • Source Connector and version: Postgres 0.3.5
  • Destination Connector and version: MySQL 0.1.6
  • Severity: Critical
  • Step where error happened: Sync job

Current Behavior

Sync job doesn't work when the source table has a column with json data, failing with the following error:

ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.RuntimeException: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Invalid JSON text: "Missing a comma or '}' after an object member." at position 92 in value for column '_airbyte_tmp_pgf_metabase_test._airbyte_data'.

Expected Behavior

Sync should work.

Logs

Sync LOG

2021-07-06 20:44:58 INFO () WorkerRun(call):62 - Executing worker wrapper. Airbyte version: 0.26.2-alpha
2021-07-06 20:44:58 INFO () TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.26.2-alpha
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(run):102 - start sync worker. job id: 1309 attempt id: 0
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(run):111 - configured sync modes: {public.test=full_refresh - overwrite}
2021-07-06 20:44:58 INFO () DefaultAirbyteDestination(start):78 - Running destination...
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - Checking if airbyte/destination-mysql:0.1.6 exists...
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - airbyte/destination-mysql:0.1.6 was found locally.
2021-07-06 20:44:58 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1309/0 --network host airbyte/destination-mysql:0.1.6 write --config destination_config.json --catalog destination_catalog.json
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - Checking if airbyte/source-postgres:0.3.5 exists...
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - airbyte/source-postgres:0.3.5 was found locally.
2021-07-06 20:44:58 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1309/0 --network host airbyte/source-postgres:0.3.5 read --config source_config.json --catalog source_catalog.json --state input_state.json
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):237 - Destination output thread started.
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(run):139 - Waiting for source thread to join.
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):207 - Replication thread started.
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.d.m.MySQLDestination(main):110 - {} - starting destination: class io.airbyte.integrations.destination.mysql.MySQLDestination
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):78 - {} - Running integration: io.airbyte.integrations.destination.mysql.MySQLDestination
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):82 - {} - Command: WRITE
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):83 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.s.p.PostgresSource(main):336 - {} - starting source: class io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):78 - {} - Running integration: io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {read=null, catalog=source_catalog.json, state=input_state.json, config=source_config.json}
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):82 - {} - Command: READ
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):83 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='input_state.json'}
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=metabase-test, namespace=null, outputSchemaName=airbyte_raw, tmpTableName=_airbyte_tmp_pgf_metabase_test, outputTableName=_airbyte_raw_metabase_test, syncMode=overwrite}
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(startTracked):120 - {} - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):139 - {} - Preparing tmp tables in destination started for 1 streams
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream metabase-test. schema: airbyte_raw, tmp table name: _airbyte_tmp_pgf_metabase_test
2021-07-06 20:45:10 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:10 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):149 - {} - Preparing tables in destination completed.
2021-07-06 20:45:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:11 �[32mINFO�[m i.a.i.s.p.PostgresSource(isCdc):287 - {} - using CDC: false
2021-07-06 20:45:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:11 �[32mINFO�[m i.a.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):98 - {} - Attempting to get metadata from the database to see if we can connect.
2021-07-06 20:45:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:12 �[32mINFO�[m i.a.i.s.r.CdcStateManager(<init>):46 - {} - Initialized CDC state with: null
2021-07-06 20:45:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:12 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='test', namespace='public'}, New Cursor Field: null. Resetting cursor value
2021-07-06 20:45:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:18 �[32mINFO�[m i.a.i.s.p.PostgresSource(isCdc):287 - {} - using CDC: false
2021-07-06 20:45:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:18 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):460 - {} - Queueing query for table: test
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):200 - {} - Closing database connection pool.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):202 - {} - Closed database connection pool.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):118 - {} - Completed integration: io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.s.p.PostgresSource(main):338 - {} - completed source: class io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:21 INFO () DefaultReplicationWorker(run):141 - Source thread complete.
2021-07-06 20:45:21 INFO () DefaultReplicationWorker(run):142 - Waiting for destination thread to join.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 - {} - Airbyte message consumer: succeeded.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(close):190 - {} - executing on success close procedure.
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.RuntimeException: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Invalid JSON text: "Missing a comma or '}' after an object member." at position 92 in value for column '_airbyte_tmp_pgf_metabase_test._airbyte_data'.
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.lambda$loadDataIntoTable$0(MySQLSqlOperations.java:90)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.java:61)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.loadDataIntoTable(MySQLSqlOperations.java:76)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.insertRecords(MySQLSqlOperations.java:62)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.jdbc.JdbcBufferedConsumerFactory.lambda$recordWriterFunction$2(JdbcBufferedConsumerFactory.java:167)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.flushQueueToDestination(BufferedStreamConsumer.java:164)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:191)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:82)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:138)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:113)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLDestination.main(MySQLDestination.java:111)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - Caused by: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Invalid JSON text: "Missing a comma or '}' after an object member." at position 92 in value for column '_airbyte_tmp_pgf_metabase_test._airbyte_data'.
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:104)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:764)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:648)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:194)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:194)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.lambda$loadDataIntoTable$0(MySQLSqlOperations.java:87)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - ... 10 more
2021-07-06 20:45:22 INFO () DefaultReplicationWorker(run):144 - Destination thread complete.
2021-07-06 20:45:22 WARN () DefaultAirbyteDestination(close):125 - Destination process might not have shut down correctly. destination process alive: false, destination process exit value: 1. This warning is normal if the job was cancelled.
2021-07-06 20:45:22 INFO () DefaultReplicationWorker(run):169 - sync summary: io.airbyte.config.ReplicationAttemptSummary@db6c580[status=completed,recordsSynced=1,bytesSynced=206,startTime=1625604298259,endTime=1625604322001]
2021-07-06 20:45:22 INFO () DefaultReplicationWorker(run):178 - Source did not output any state messages
2021-07-06 20:45:22 WARN () DefaultReplicationWorker(run):186 - State capture: No new state, falling back on input state: io.airbyte.config.State@289cf40e[state={}]
2021-07-06 20:45:22 INFO () TemporalAttemptExecution(get):133 - Stopping cancellation check scheduling...
2021-07-06 20:45:22 INFO () RetryingTemporalAttemptExecution(get):118 - Last output present: true. Should attempt again: false
2021-07-06 20:45:22 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):200 - attempt summaries: [io.airbyte.config.ReplicationOutput@1657875c[replicationAttemptSummary=io.airbyte.config.ReplicationAttemptSummary@db6c580[status=completed,recordsSynced=1,bytesSynced=206,startTime=1625604298259,endTime=1625604322001],state=io.airbyte.config.State@289cf40e[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@259c0837[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@75889e09[stream=io.airbyte.protocol.models.AirbyteStream@3fd3a21a[name=metabase-test,jsonSchema={"type":"object","properties":{"id":{"type":"number"},"model":{"type":"string"},"topic":{"type":"string"},"details":{"type":"string"},"user_id":{"type":"number"},"model_id":{"type":"number"},"table_id":{"type":"number"},"custom_id":{"type":"string"},"timestamp":{"type":"string"},"database_id":{"type":"number"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}]]]
2021-07-06 20:45:22 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):201 - sync summary: io.airbyte.config.StandardSyncOutput@61c57430[standardSyncSummary=io.airbyte.config.StandardSyncSummary@4324c960[status=completed,recordsSynced=1,bytesSynced=206,startTime=1625604298259,endTime=1625604322001],state=io.airbyte.config.State@289cf40e[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@259c0837[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@75889e09[stream=io.airbyte.protocol.models.AirbyteStream@3fd3a21a[name=metabase-test,jsonSchema={"type":"object","properties":{"id":{"type":"number"},"model":{"type":"string"},"topic":{"type":"string"},"details":{"type":"string"},"user_id":{"type":"number"},"model_id":{"type":"number"},"table_id":{"type":"number"},"custom_id":{"type":"string"},"timestamp":{"type":"string"},"database_id":{"type":"number"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}]]

Source table definition and test data


create table test
(
	id integer,
	topic varchar(32),
	timestamp timestamp with time zone,
	user_id integer,
	model varchar(16),
	model_id integer,
	database_id integer,
	table_id integer,
	custom_id varchar(48),
	details varchar
);

insert into public.test (id, topic, timestamp, user_id, model, model_id, database_id, table_id, custom_id, details) values (1, 'install', '2017-08-22 17:22:14.519000', null, 'install', null, null, null, null, '{"name":"Conferência Faturamento - Custo - Taxas - Margem - Resumo ano inicial até -2","description":null}');

Steps to Reproduce

  1. Create the test table in a postgres db with the above statement.
  2. Setup a postgres source.
  3. Setup a mysql destination.
  4. Setup a connection.
  5. Run sync.

Metadata

Metadata

Assignees

Type

No type

Projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions