Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Incremental sync is not working as expecting #67

Open
sphinks opened this issue Aug 10, 2020 · 0 comments
Open

Incremental sync is not working as expecting #67

sphinks opened this issue Aug 10, 2020 · 0 comments

Comments

@sphinks
Copy link

sphinks commented Aug 10, 2020

I have sync Postgres-Redshfit. It works perfectly in case of fast-sync, but once I try to use Incremental sync with the same table and the same data I got issues.

  1. I face issues that dates below 0000-00-00 could not be processed during incremental sync. I know that it is not correct data and should be eliminated during input, but nevertheless. Fix it just replace such dates with null.
  2. Next issue is not clear for me at all:
time=2020-08-10 17:29:51 logger_name=target_redshift log_level=INFO message=Loading 20000 rows into 'public."STG_USER"'
time=2020-08-10 17:29:53 logger_name=target_redshift log_level=ERROR message=Failed to load stream public-user to Redshift
Traceback (most recent call last):
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 351, in load_stream_batch
    flush_records(stream, records_to_load, row_count[stream], db_sync, compression, slices, temp_dir)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 425, in flush_records
    db_sync.load_csv(copy_key, row_count, size_bytes, compression)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/db_sync.py", line 467, in load_csv
    cur.execute(copy_sql)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/psycopg2/extras.py", line 143, in execute
    return super(DictCursor, self).execute(query, vars)
psycopg2.errors.SyntaxError: conflicting or redundant options

Traceback (most recent call last):
  File "/app/.virtualenvs/target-redshift/bin/target-redshift", line 8, in <module>
    sys.exit(main())
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 447, in main
    persist_lines(config, singer_messages, table_cache)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 195, in persist_lines
    filter_streams=filter_streams)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 323, in flush_streams
    ) for stream in streams_to_flush)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 1004, in __call__
    if self.dispatch_one_batch(iterator):
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 835, in dispatch_one_batch
    self._dispatch(tasks)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 754, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 209, in apply_async
    result = ImmediateResult(func)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 590, in __init__
    self.results = batch()
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 256, in __call__
    for func, args, kwargs in self.items]
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 256, in <listcomp>
    for func, args, kwargs in self.items]
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 361, in load_stream_batch
    raise e
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 351, in load_stream_batch
    flush_records(stream, records_to_load, row_count[stream], db_sync, compression, slices, temp_dir)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 425, in flush_records
    db_sync.load_csv(copy_key, row_count, size_bytes, compression)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/db_sync.py", line 467, in load_csv
    cur.execute(copy_sql)
  File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/psycopg2/extras.py", line 143, in execute
    return super(DictCursor, self).execute(query, vars)
psycopg2.errors.SyntaxError: conflicting or redundant options

time=2020-08-10 17:29:54 logger_name=singer log_level=INFO message=METRIC: {"type": "counter", "metric": "record_count", "value": 20032, "tags": {}}
time=2020-08-10 17:29:54 logger_name=tap_postgres log_level=CRITICAL message=[Errno 32] Broken pipe
Traceback (most recent call last):
  File "/app/.virtualenvs/tap-postgres/bin/tap-postgres", line 8, in <module>
    sys.exit(main())
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/__init__.py", line 434, in main
    raise exc
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/__init__.py", line 431, in main
    main_impl()
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/__init__.py", line 421, in main_impl
    args.config.get('default_replication_method'), state, state_file)
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/__init__.py", line 315, in do_sync
    end_lsn)
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/__init__.py", line 174, in sync_traditional_stream
    state = do_sync_incremental(conn_config, stream, state, desired_columns, md_map)
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/__init__.py", line 85, in do_sync_incremental
    state = incremental.sync_table(conn_config, stream, state, desired_columns, md_map)
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/sync_strategies/incremental.py", line 113, in sync_table
    singer.write_message(record_message)
  File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/singer/messages.py", line 227, in write_message
    sys.stdout.flush()
BrokenPipeError: [Errno 32] Broken pipe
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>
BrokenPipeError: [Errno 32] Broken pipe

No idea, what is going wrong here. It is really strange: fast_sync move the same data (even incorrect dates) without any issue, incremental run is failing.

Let me know in case you need additional details or the issue should be raised in tap-postgres repository.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant