Skip to content

Commit d5c7989

Browse files
committed
parsing discrepancy
1 parent aa344d1 commit d5c7989

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor):
3131
DEFAULT_MAX_HISTORY_SIZE = 10_000
3232
DATE_TIME_FORMAT = DefaultFileBasedCursor.DATE_TIME_FORMAT
3333
zero_value = datetime.min
34+
zero_cursor_value = f"0001-01-01T00:00:00.000000Z_{_NULL_FILE}"
3435

3536
def __init__(
3637
self,
@@ -86,7 +87,7 @@ def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -
8687
def _compute_prev_sync_cursor(self, value: Optional[StreamState]) -> Tuple[datetime, str]:
8788
if not value:
8889
return self.zero_value, ""
89-
prev_cursor_str = value.get(self._cursor_field.cursor_field_key) or self._zero_cursor_value()
90+
prev_cursor_str = value.get(self._cursor_field.cursor_field_key) or self.zero_cursor_value
9091
# So if we see a cursor greater than the earliest file, it means that we have likely synced all files.
9192
# However, we take the earliest file as the cursor value for the purpose of checking which files to
9293
# sync, in case new files have been uploaded in the meantime.
@@ -96,17 +97,14 @@ def _compute_prev_sync_cursor(self, value: Optional[StreamState]) -> Tuple[datet
9697
# files have already been uploaded. If that's the case, they'll be in history and we'll skip
9798
# re-uploading them.
9899
earliest_file_cursor_value = self._get_cursor_key_from_file(self._compute_earliest_file_in_history())
99-
cursor = min(prev_cursor_str, earliest_file_cursor_value)
100-
cursor_dt, cursor_uri = cursor.split("_", 1)
100+
cursor_str = min(prev_cursor_str, earliest_file_cursor_value)
101+
cursor_dt, cursor_uri = cursor_str.split("_", 1)
101102
return datetime.strptime(cursor_dt, self.DATE_TIME_FORMAT), cursor_uri
102103

103104
def _get_cursor_key_from_file(self, file: Optional[RemoteFile]) -> str:
104105
if file:
105106
return f"{datetime.strftime(file.last_modified, self.DATE_TIME_FORMAT)}_{file.uri}"
106-
return self._zero_cursor_value()
107-
108-
def _zero_cursor_value(self) -> str:
109-
return f"{self.zero_value.strftime(self.DATE_TIME_FORMAT)}_{_NULL_FILE}"
107+
return self.zero_cursor_value
110108

111109
def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
112110
with self._state_lock:

0 commit comments

Comments
 (0)