-
Notifications
You must be signed in to change notification settings - Fork 4.6k
🚨🚨 Low code CDK: Decouple SimpleRetriever and HttpStream #28657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
44a91c9
ca7dd9c
34831fc
6ca6d21
872e173
9bdfa53
1771157
d71a437
dfdbbde
fb2d3f3
ba13004
87069bc
2755d0e
ffce52e
c6e02d6
b74d9cf
0430113
dec1322
1d53b60
ba5f7e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,14 +37,17 @@ class DeclarativeStream(Stream): | |
schema_loader: Optional[SchemaLoader] = None | ||
_name: str = field(init=False, repr=False, default="") | ||
_primary_key: str = field(init=False, repr=False, default="") | ||
_schema_loader: SchemaLoader = field(init=False, repr=False, default=None) | ||
stream_cursor_field: Optional[Union[InterpolatedString, str]] = None | ||
|
||
def __post_init__(self, parameters: Mapping[str, Any]): | ||
self.stream_cursor_field = InterpolatedString.create(self.stream_cursor_field, parameters=parameters) | ||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
self._stream_cursor_field = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a breaking change as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's used anywhere directly for access. |
||
InterpolatedString.create(self.stream_cursor_field, parameters=parameters) | ||
if isinstance(self.stream_cursor_field, str) | ||
else self.stream_cursor_field | ||
) | ||
self._schema_loader = self.schema_loader if self.schema_loader else DefaultSchemaLoader(config=self.config, parameters=parameters) | ||
|
||
@property | ||
@property # type: ignore | ||
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: | ||
return self._primary_key | ||
|
||
|
@@ -53,7 +56,7 @@ def primary_key(self, value: str) -> None: | |
if not isinstance(value, property): | ||
self._primary_key = value | ||
|
||
@property | ||
@property # type: ignore | ||
def name(self) -> str: | ||
""" | ||
:return: Stream name. By default this is the implementing class name, but it can be overridden as needed. | ||
|
@@ -67,14 +70,16 @@ def name(self, value: str) -> None: | |
|
||
@property | ||
def state(self) -> MutableMapping[str, Any]: | ||
return self.retriever.state | ||
return self.retriever.state # type: ignore | ||
|
||
@state.setter | ||
def state(self, value: MutableMapping[str, Any]): | ||
def state(self, value: MutableMapping[str, Any]) -> None: | ||
"""State setter, accept state serialized by state getter.""" | ||
self.retriever.state = value | ||
|
||
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): | ||
def get_updated_state( | ||
self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] | ||
) -> MutableMapping[str, Any]: | ||
return self.state | ||
|
||
@property | ||
|
@@ -83,22 +88,22 @@ def cursor_field(self) -> Union[str, List[str]]: | |
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. | ||
:return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. | ||
""" | ||
cursor = self.stream_cursor_field.eval(self.config) | ||
cursor = self._stream_cursor_field.eval(self.config) | ||
return cursor if cursor else [] | ||
|
||
def read_records( | ||
self, | ||
sync_mode: SyncMode, | ||
cursor_field: List[str] = None, | ||
stream_slice: Mapping[str, Any] = None, | ||
stream_state: Mapping[str, Any] = None, | ||
cursor_field: Optional[List[str]] = None, | ||
stream_slice: Optional[Mapping[str, Any]] = None, | ||
stream_state: Optional[Mapping[str, Any]] = None, | ||
) -> Iterable[Mapping[str, Any]]: | ||
""" | ||
:param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state. | ||
""" | ||
yield from self.retriever.read_records(sync_mode, cursor_field, stream_slice) | ||
yield from self.retriever.read_records(stream_slice) | ||
|
||
def get_json_schema(self) -> Mapping[str, Any]: | ||
def get_json_schema(self) -> Mapping[str, Any]: # type: ignore | ||
""" | ||
:return: A dict of the JSON schema representing this stream. | ||
|
||
|
@@ -108,7 +113,7 @@ def get_json_schema(self) -> Mapping[str, Any]: | |
return self._schema_loader.get_json_schema() | ||
|
||
def stream_slices( | ||
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None | ||
self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None | ||
) -> Iterable[Optional[Mapping[str, Any]]]: | ||
""" | ||
Override to define the slices for this stream. See the stream slicing section of the docs for more information. | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -3,7 +3,7 @@ | |||
# | ||||
|
||||
from dataclasses import InitVar, dataclass | ||||
from typing import Any, List, Mapping, Optional, Union | ||||
from typing import Any, List, Mapping, MutableMapping, Optional, Union | ||||
|
||||
import requests | ||||
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator | ||||
|
@@ -27,7 +27,7 @@ def get_request_params( | |||
stream_state: Optional[StreamState] = None, | ||||
stream_slice: Optional[StreamSlice] = None, | ||||
next_page_token: Optional[Mapping[str, Any]] = None, | ||||
) -> Mapping[str, Any]: | ||||
) -> MutableMapping[str, Any]: | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we returning this a something Mutable? Does anything mutates the request params? Mutable things are often preferred as it ensures that we don't update something in memory that someone else rely on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just fixing the types (it's coming from here: Line 31 in 1c482ee
I agree that we should make this non-mutable but I would like to split it out of this PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be safe to change the RequestOptionsProvider's return type to |
||||
return {} | ||||
|
||||
def get_request_headers( | ||||
|
@@ -60,6 +60,6 @@ def get_request_body_json( | |||
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Mapping[str, Any]: | ||||
return {} | ||||
|
||||
def reset(self): | ||||
def reset(self) -> None: | ||||
# No state to reset | ||||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How long are we to remove the call to list streams? It breaks my heart every time I see us maintaining something that we want to remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @lmossman is about to get started on the frontend work that will allows us to not rely on the list api anymore. So a few more weeks at max