Skip to content

Commit 0c78f6d

Browse files
committed
Renamed list method and removed _filter_dynamic_fields
1 parent 2bf52d5 commit 0c78f6d

File tree

2 files changed

+11
-22
lines changed
  • airbyte-integrations/connectors/source-hubspot/source_hubspot

2 files changed

+11
-22
lines changed

airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py

+10-21
Original file line numberDiff line numberDiff line change
@@ -216,19 +216,8 @@ def name(self) -> str:
216216
stream_name = stream_name[: -len("Stream")]
217217
return stream_name
218218

219-
def list(self, fields) -> Iterable:
219+
def list_records(self, fields) -> Iterable:
220220
yield from self.read(partial(self._api.get, url=self.url))
221-
222-
def _filter_dynamic_fields(self, records: Iterable) -> Iterable:
223-
"""Skip certain fields because they are too dynamic and change every call (timers, etc),
224-
see https://github.com/airbytehq/airbyte/issues/2397
225-
"""
226-
for record in records:
227-
if isinstance(record, Mapping) and "properties" in record:
228-
for key in list(record["properties"].keys()):
229-
if key.startswith("hs_time_in"):
230-
record["properties"].pop(key)
231-
yield record
232221

233222
@staticmethod
234223
def _cast_value(declared_field_types: List, field_name: str, field_value: Any, declared_format: str = None) -> Any:
@@ -575,7 +564,7 @@ def search(
575564
# As per their docs: `These search endpoints are rate limited to four requests per second per authentication token`.
576565
return self._api.post(url=url, data=data, params=params)
577566

578-
def list(self, fields) -> Iterable:
567+
def list_records(self, fields) -> Iterable:
579568
params = {
580569
"archived": str(self._include_archived_only).lower(),
581570
"associations": self.associations,
@@ -584,7 +573,7 @@ def list(self, fields) -> Iterable:
584573
generator = self.read(partial(self.search, url=self.url), params)
585574
else:
586575
generator = self.read(partial(self._api.get, url=self.url), params)
587-
yield from self._flat_associations(self._filter_dynamic_fields(self._filter_old_records(generator)))
576+
yield from self._flat_associations(self._filter_old_records(generator))
588577

589578
def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
590579
"""Apply state filter to set of records, update cursor(state) if necessary in the end"""
@@ -652,7 +641,7 @@ def __init__(
652641
if not self.entity:
653642
raise ValueError("Entity must be set either on class or instance level")
654643

655-
def list(self, fields) -> Iterable:
644+
def list_records(self, fields) -> Iterable:
656645
params = {
657646
"archived": str(self._include_archived_only).lower(),
658647
"associations": self.associations,
@@ -679,7 +668,7 @@ class CampaignStream(Stream):
679668
limit = 500
680669
updated_at_field = "lastUpdatedTime"
681670

682-
def list(self, fields) -> Iterable:
671+
def list_records(self, fields) -> Iterable:
683672
for row in self.read(getter=partial(self._api.get, url=self.url)):
684673
record = self._api.get(f"/email/public/v1/campaigns/{row['id']}")
685674
yield {**row, **record}
@@ -752,7 +741,7 @@ def _transform(self, records: Iterable) -> Iterable:
752741
if updated_at:
753742
yield {"id": record.get("dealId"), "dealstage": dealstage, self.updated_at_field: updated_at}
754743

755-
def list(self, fields) -> Iterable:
744+
def list_records(self, fields) -> Iterable:
756745
params = {"propertiesWithHistory": "dealstage"}
757746
yield from self.read(partial(self._api.get, url=self.url), params)
758747

@@ -764,12 +753,12 @@ def __init__(self, **kwargs):
764753
super().__init__(entity="deal", last_modified_field="hs_lastmodifieddate", **kwargs)
765754
self._stage_history = DealStageHistoryStream(**kwargs)
766755

767-
def list(self, fields) -> Iterable:
756+
def list_records(self, fields) -> Iterable:
768757
history_by_id = {}
769-
for record in self._stage_history.list(fields):
758+
for record in self._stage_history.list_records(fields):
770759
if all(field in record for field in ("id", "dealstage")):
771760
history_by_id[record["id"]] = record["dealstage"]
772-
for record in super().list(fields):
761+
for record in super().list_records(fields):
773762
if record.get("id") and int(record["id"]) in history_by_id:
774763
record["dealstage"] = history_by_id[int(record["id"])]
775764
yield record
@@ -849,7 +838,7 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
849838
if self.state:
850839
params['since'] = self._state
851840
count = 0
852-
for record in self._filter_dynamic_fields(self._filter_old_records(self._read(getter, params))):
841+
for record in self._filter_old_records(self._read(getter, params)):
853842
yield record
854843
count += 1
855844
cursor = record[self.updated_at_field]

airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def __init__(self, start_date, credentials, **kwargs):
6363
super().__init__(**kwargs)
6464

6565
def _enumerate_methods(self) -> Mapping[str, Callable]:
66-
return {name: api.list for name, api in self._apis.items()}
66+
return {name: api.list_records for name, api in self._apis.items()}
6767

6868
@property
6969
def streams(self) -> Iterator[AirbyteStream]:

0 commit comments

Comments
 (0)