Skip to content

🐛 Source Amazon S3: solve possible case of files being missed during incremental syncs #12568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
from abc import ABC, abstractmethod
from copy import deepcopy
from datetime import datetime
from datetime import datetime, timedelta
from functools import lru_cache
from traceback import format_exc
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union
Expand Down Expand Up @@ -250,7 +250,7 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
return self.master_schema

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Dict[str, Any]]]:
"""
This builds full-refresh stream_slices regardless of sync_mode param.
Expand Down Expand Up @@ -306,10 +306,10 @@ def _add_extra_fields_from_map(self, record: Dict[str, Any], extra_map: Mapping[
return record

def _read_from_slice(
self,
file_reader: AbstractFileParser,
stream_slice: Mapping[str, Any],
stream_state: Mapping[str, Any] = None,
self,
file_reader: AbstractFileParser,
stream_slice: Mapping[str, Any],
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
Uses provider-relevant StorageFile to open file and then iterates through stream_records() using format-relevant AbstractFileParser.
Expand All @@ -333,11 +333,11 @@ def _read_from_slice(
LOGGER.info("finished reading a stream slice")

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic
Expand All @@ -350,6 +350,9 @@ def read_records(
class IncrementalFileStream(FileStream, ABC):
# TODO: ideally want to checkpoint after every file or stream slice rather than N records
state_checkpoint_interval = None
buffer_days = 3 # keeping track of all files synced in the last N days
sync_all_files_always = False
max_history_size = 1000000000

@property
def cursor_field(self) -> str:
Expand All @@ -358,13 +361,56 @@ def cursor_field(self) -> str:
"""
return self.ab_last_mod_col

@staticmethod
def file_in_history(file_info: FileInfo, history: dict) -> bool:
for slot in history.values():
if file_info.key in slot:
return file_info.key in slot
return False

def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None) -> datetime:
"""if no state, we default to 1970-01-01 in order to pick up all files present."""
if stream_state is not None and self.cursor_field in stream_state.keys():
return datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string)
else:
return datetime.strptime("1970-01-01T00:00:00+0000", self.datetime_format_string)

def get_updated_history(self, current_stream_state, latest_record_datetime, latest_record, current_parsed_datetime, state_date):
"""
History is dict which basically groups files by their modified_at date.
After reading each record we add its file to the history set if it wasn't already there.
Then we drop from the history set any entries whose key is less than now - buffer_days
"""

history = current_stream_state.get("history", {})

file_modification_date = latest_record_datetime.strftime("%Y-%m-%d")

# add record to history if record modified date in range delta start from state
if latest_record_datetime.date() + timedelta(days=self.buffer_days) >= state_date:
history_item = set(history.setdefault(file_modification_date, set()))
history_item.add(latest_record[self.ab_file_name_col])
history[file_modification_date] = history_item

# reset history to new date state
if current_parsed_datetime.date() != state_date:
history = {date: history[date] for date in history if datetime.strptime(date, "%Y-%m-%d").date() + timedelta(
days=self.buffer_days) >= state_date}

return history

def size_history_balancer(self, state_dict):
"""
Delete history if state size limit reached
"""
history = state_dict["history"]

if history.__sizeof__() > self.max_history_size:
self.sync_all_files_always = True
state_dict.pop("history")

return state_dict

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
Expand All @@ -383,18 +429,41 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
state_dict[self.cursor_field] = datetime.strftime(max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string)

state_dict["schema"] = self._get_schema_map()
return state_dict

state_date = self._get_datetime_from_stream_state(state_dict).date()

if not self.sync_all_files_always:
state_dict["history"] = self.get_updated_history(current_stream_state, latest_record_datetime, latest_record,
current_parsed_datetime, state_date)

return self.size_history_balancer(state_dict)

def need_to_skip_file(self, stream_state, file_info):
"""
Skip this file if last_mod is earlier than our cursor value from state and already in history
or skip this file if last_mod plus delta is earlier than our cursor value
"""
file_in_history_and_last_modified_is_earlier_than_cursor_value = (
stream_state is not None
and self.cursor_field in stream_state.keys()
and file_info.last_modified <= self._get_datetime_from_stream_state(stream_state)
and self.file_in_history(file_info, stream_state.get("history", {})))

file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value = file_info.last_modified + timedelta(
days=self.buffer_days) < self._get_datetime_from_stream_state(stream_state) and not self.file_in_history(file_info, stream_state.get("history", {}))

return file_in_history_and_last_modified_is_earlier_than_cursor_value or file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Dict[str, Any]]]:
"""
Builds either full_refresh or incremental stream_slices based on sync_mode.
An incremental stream_slice is a group of all files with the exact same last_modified timestamp.
This ensures we only update the cursor state to a given timestamp after ALL files with that timestamp have been successfully read.

Slight nuance: as we iterate through get_time_ordered_file_infos(),
we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration.
we yield the stream_slice containing file(s) up to and Excluding the file on the current iteration.
The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice
"""
if sync_mode == SyncMode.full_refresh:
Expand All @@ -410,12 +479,7 @@ def stream_slices(
prev_file_last_mod: datetime = None # init variable to hold previous iterations last modified
grouped_files_by_time: List[Dict[str, Any]] = []
for file_info in self.get_time_ordered_file_infos():
# skip this file if last_mod is earlier than our cursor value from state
if (
stream_state is not None
and self.cursor_field in stream_state.keys()
and file_info.last_modified <= self._get_datetime_from_stream_state(stream_state)
):
if self.need_to_skip_file(stream_state, file_info):
continue

# check if this file belongs in the next slice, if so yield the current slice before this file
Expand All @@ -436,11 +500,11 @@ def stream_slices(
yield None

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic.
Expand Down
Loading