Skip to content

🐛 Source Snowflake: In timestamp cursor fields, seconds in cursor values are rounded to ceil and causing to sync rows whose cursor column values are smaller than cursor. #9915

Closed
@ameyabapat-bsft

Description

@ameyabapat-bsft

Environment

  • Airbyte version: 0.35.5-alpha
  • OS Version / Instance: AWS EC2
  • Deployment: Docker
  • Source Connector and version: snowflake - 0.1.5
  • Destination Connector and version: S3 - 0.2.5
  • Severity: Medium
  • Step where error happened: Sync job

Current Behavior

Even though there is no change in cursor value, incremental sync pulls up rows whose cursor columns values are smaller than cursor value.
Cursor value(timestamp) is getting rounded to ceil when last (milliseconds) field is more than 500.
Ex: Snowflake timestamp value: 2022-01-28 10:30:33.614 which got round to 2022-01-28T10:30:34Z by airbyte.

Expected Behavior

It should not sync rows whose value is smaller than cursor value.

Logs

Please check them in sequence after reading steps to reproduce.
logs-58-0.txt
logs-59-0.txt
logs-60-0.txt
logs-61-0.txt
logs-62-0.txt

Steps to Reproduce

  1. Create Snowflake -> Aws S3 incremental connection and manual frequency.
    My snowflake table is
    CREATE OR REPLACE TABLE test_timestamp ( id int, request_timestamp_formatted TIMESTAMP_NTZ );
    Set request_timestamp_formatted as cursor field.

Screenshot 2022-01-28 at 4 50 44 PM

  1. Add 132 rows to the table.
    insert into test_timestamp select * from (select seq2(0), current_timestamp() from table(generator(rowcount => 132))) order by seq2(0) limit 132;

  2. Run Sync. It should sync 132 records.

  3. Add 100 records.
    insert into test_timestamp select * from (select seq2(0), current_timestamp() from table(generator(rowcount => 132))) order by seq2(0) limit 100;

  4. It should sync 232 records as per the greater than equal to cursor value logic.

  5. Add 3 more rows.
    insert into test_timestamp select * from (select seq2(0), current_timestamp() from table(generator(rowcount => 132))) order by seq2(0) limit 3;

  6. Run Sync. It should sync 103 records: 100 repeated + 3 new.

  7. On all subsequent syncs, It pulls up 103 records every-time.

Ideally it should sync only 3 records where cursor value is equal to newly added 3 records but it keeps on syncing old 100 records as well whose cursor column value is less than cursor value.
For reference:
My snowflake db structure after these steps.
select count(*), request_timestamp_formatted from test_timestamp group by request_timestamp_formatted;
Screenshot 2022-01-31 at 4 17 33 PM

Slack Thread : https://airbytehq.slack.com/archives/C01MFR03D5W/p1643368624680109

Are you willing to submit a PR?

I am not fully verse with code base. I feel we would need to fix at jdbc level on which this source is built on. I could try to fix given some pointers on code change locations.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions