Description
Environment
- Airbyte version: 0.32.0-alpha-patch-1
- OS Version / Instance: macOS and GCP n2.
- Deployment: Docker and Kubernetes
- Source Connector and version: airbyte/source-postgres 0.4.0
- Destination Connector and version: airbyte/source-snowflake 0.1.2 (but it doesn't matter)
- Severity: High (it affects data integrity. Also, other field types and sources could be compromised)
- Step where error happened: Sync job
Current Behavior
Postgres timestamp is converted to string and milliseconds precision is lost. It affects all timestamp fields and especially the cursor behavior because it saves the converted string value. Due to precision is being lost during the extraction phase from the source, it affects all the connections where this source is involved. Furthermore, and most important, it affects how Airbyte syncs data, depending on the last cursor value Airbyte will lose or duplicate data.
If the last selected cursor timestamp milliseconds value in Postgres is lower than .500
the cursor value saved on Airbyte is truncated, and if it is equal to or greater than .500
, the cursor saved is rounded.
Example of a selected cursor with the last value lower than .500
:
- Postgres timestamp value:
2021-11-18 15:28:26.499
- Aribyte cursor value:
2021-11-18T15:28:26Z
- Issue: all rows with cursor value on Postgres greater than
2021-11-18T15:28:26.000
would be resynced even though the cursor and data don't change.
Example of a selected cursor with the last value equal to or greater than .500
:
- Postgres timestamp value:
2021-11-18 15:28:26.500
- Aribyte cursor value:
2021-11-18T15:28:27Z
- Issue: all new rows with cursor value greater than
2021-11-18 15:28:26.500
but equal to or lower than2021-11-18T15:28:27.000
will be lost.
Expected Behavior
Timestamps should keep millisecond precision.
Logs
2021-12-28 13:21:56 INFO i.a.c.p.ConfigRepository(updateConnectionState):545 - Updating connection c2d39a1a-2699-4770-8750-f24150f40bc5 state: io.airbyte.config.State@34506f80[state={"cdc":false,"streams":[{"stream_name":"invoice","stream_namespace":"invoice","cursor_field":["updated_at"],"cursor":"2021-11-18T15:28:26Z"}]}]
Steps to Reproduce
Selected cursor with the last value lower than .500
- Select a timestamp cursor from Postgres as your Airbyte cursor field.
- Create example data, the millisecond value of the greater value of the field selected cursor must be lower than .500
- Sync your connection and you'll see on logs that the Airbyte cursor value is truncated (like the example log from above).
- Resync your connection without updating or adding new data.
- Airbyte synced again data that was already synced.
Selected cursor with the last value equal to or greater than .500
- Select a timestamp cursor from Postgres as your Airbyte cursor field.
- Create example data, the millisecond value of the greater value of the field selected as cursor must be equal to or greater than
.500
- Sync your connection and you'll see on logs that the Airbyte cursor value is rounded (like the example log from above).
- Update or add new data with the cursor field value greater than your actual value from Postgres (with milliseconds precision) and equal to or lower than the Airbyte cursor value shown in logs (without milliseconds)
- Resync your connection.
- Updated or added new data from step 4 wasn't synced and they are lost.
Are you willing to submit a PR?
I've tried to fix it by myself but it looks like it should be fixed in a library that is used or can be used by other JAVA JDBC connections AbstractJdbcCompatibleSourceOperations.java
. Also, I think other types of fields from the same (like time
) or different source connectors can be compromised even though they're not using this airbyte-db
lib because it looks like is a design decision to use only seconds precision. I want to collaborate but I think is something that should be discussed by core committers, I am willing to participate if you think that I might help.
I've started to follow the code from here at source-postgres
Proposed solution:
We should modify airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java
file.
Change the line 125 that uses DataTypeUtils.toISO8601String
function
to another existent function from the same lib DataTypeUtils.toISO8601StringWithMilliseconds
node.put(columnName, DataTypeUtils.toISO8601StringWithMilliseconds(d));
FYI
I exposed this issue on Slack before in order to understand if it was my fault or it was really an issue. Marcos Marx, Liren Tu, and Augustin participated in the discussion.
┆Issue is synchronized with this Asana task by Unito