Description
Environment
- Airbyte version: 0.35.27-alpha
- OS Version / Instance: AWS EC2
- Deployment: Docker
- Source Connector and version: Microsoft SQL Server (MSSQL) v0.3.14
- Destination Connector and version: Snowflake v0.4.8
- Severity: Medium-High
- Step where error happened: Sync job
Current Behaviour
Initial load completes as expected (all rows replicated). However, running a subsequent load (with the incremental + append option, using ModifiedDate (datetime) as a cursor field), all records are loaded again even though there are no changes in the source MSSQL database cursor field.
In the source database (a standard AdventureWorks SQL Server demo database), all the ModifiedDate values in the Sales.Customer table are set to "2014-09-12 11:15:07.263". When syncing this data, Airbyte saves the cursor value rounded down to "2014-09-12 11:15:07Z", stripping out the millisecond component. The next time a sync runs, of course 2014-09-12 11:15:07.263 is technically greater than 2014-09-12 11:15:07Z, so all records are synced over again when they shouldn't be.
Interestingly, the record payload itself within Snowflake is also rounded, meaning that both the cursor and the replicated data have less precision than the source database:
Expected Behaviour
No changes to the cursor value in the source should yield no records emitted; replicated columns should contain the same precision as the source
Logs
I've also tried to point out useful bits below with line numbers
Log highlights
Initial sync (logs-4.txt), the cursor is null - therefore replicate everything (expected)
[L828]: 2022-02-10 07:03:38 �[source] > 2022-02-10 07:03:38 �[INFO] i.a.i.s.r.StateDecoratingIterator(computeNext):60 - State Report: stream name: AirbyteStreamNameNamespacePair{name='Customer', namespace='Sales'}, original cursor field: null, original cursor null, cursor field: ModifiedDate, new cursor: 2014-09-12T11:15:07Z
However, in the incremental run (logs-5.txt), shenanigans ensue:
[L808]: 2022-02-10 07:04:39 �[source]�> 2022-02-10 07:04:39 �[INFO]�i.a.i.s.m.MssqlSource(lambda$queryTableIncremental$0):111 - Prepared SQL query for queryTableIncremental is: SELECT "CustomerID","PersonID","StoreID","TerritoryID","AccountNumber","rowguid","ModifiedDate" FROM "Sales"."Customer" WHERE "ModifiedDate" > ?
[L830]: 2022-02-10 07:04:44 �[source]> 2022-02-10 07:04:43 [INFO] i.a.i.s.r.StateDecoratingIterator(computeNext):60 - State Report: stream name: AirbyteStreamNameNamespacePair{name='Customer', namespace='Sales'}, original cursor field: ModifiedDate, original cursor 2014-09-12T11:15:07Z, cursor field: ModifiedDate, new cursor: 2014-09-12T11:15:07Z
Steps to Reproduce
- Create SQL Server source (e.g. a basic AdventureWorks RDS database in my case)
- Create a Snowflake destination
- Create a connection between the two using incremental method, replicate Sales.Customer table, using cursor column ModifiedDate (datetime)
- Run initial sync
- Run incremental sync
Are you willing to submit a PR?
Willing, but perhaps not all that able! :) I'm not super familiar with the Airbyte codebase, so it may be more efficient for someone else to look into it who knows it better.
This issue looks broadly pretty similar to existing issues #8904 and #9915, but wanted to call out that MSSQL is also affected.