Skip to content

Commit 231bc52

Browse files
authored
✨ Source Linkedin Ads: Update API version to 202404 (#37573)
1 parent 5574b07 commit 231bc52

File tree

7 files changed

+234
-182
lines changed

7 files changed

+234
-182
lines changed

airbyte-integrations/connectors/source-linkedin-ads/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: api
1212
connectorType: source
1313
definitionId: 137ece28-5434-455c-8f34-69dc3782f451
14-
dockerImageTag: 2.0.0
14+
dockerImageTag: 2.1.0
1515
dockerRepository: airbyte/source-linkedin-ads
1616
documentationUrl: https://docs.airbyte.com/integrations/sources/linkedin-ads
1717
githubIssueLabel: source-linkedin-ads

airbyte-integrations/connectors/source-linkedin-ads/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.0.0"
6+
version = "2.1.0"
77
name = "source-linkedin-ads"
88
description = "Source implementation for Linkedin Ads."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/analytics_streams.py

+27-8
Original file line numberDiff line numberDiff line change
@@ -237,19 +237,38 @@ def get_primary_key_from_slice(self, stream_slice) -> str:
237237

238238
def stream_slices(
239239
self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
240-
) -> Iterable[List[Mapping[str, Any]]]:
240+
) -> Iterable[Optional[Mapping[str, List[Mapping[str, Any]]]]]:
241241
"""
242242
LinkedIn has a max of 20 fields per request. We make chunks by size of 19 fields to have the `dateRange` be included as well.
243243
https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting?view=li-lms-2023-05&tabs=http#requesting-specific-metrics-in-the-analytics-finder
244244
245245
:param sync_mode:
246246
:param cursor_field:
247247
:param stream_state:
248-
:return: Iterable with List of stream slices within the same date range and chunked fields, example
249-
[{'campaign_id': 123, 'fields': 'field_1,field_2,dateRange', 'dateRange': {'start.day': 1, 'start.month': 1, 'start.year': 2020, 'end.day': 30, 'end.month': 1, 'end.year': 2020}},
250-
{'campaign_id': 123, 'fields': 'field_2,field_3,dateRange', 'dateRange': {'start.day': 1, 'start.month': 1, 'start.year': 2020, 'end.day': 30, 'end.month': 1, 'end.year': 2020}},
251-
{'campaign_id': 123, 'fields': 'field_4,field_5,dateRange', 'dateRange': {'start.day': 1, 'start.month': 1, 'start.year': 2020, 'end.day': 30, 'end.month': 1, 'end.year': 2020}}]
252-
248+
:return: An iterable of dictionaries, each containing a single key 'field_date_chunks'. The value under 'field_date_chunks' is
249+
a list of dictionaries where each dictionary represents a slice of data defined by a specific date range and chunked fields.
250+
251+
Example of returned data:
252+
{
253+
'field_date_chunks': [
254+
{
255+
'campaign_id': 123,
256+
'fields': 'field_1,field_2,dateRange',
257+
'dateRange': {
258+
'start.day': 1, 'start.month': 1, 'start.year': 2020,
259+
'end.day': 30, 'end.month': 1, 'end.year': 2020
260+
}
261+
},
262+
{
263+
'campaign_id': 123,
264+
'fields': 'field_3,field_4,dateRange',
265+
'dateRange': {
266+
'start.day': 1, 'start.month': 1, 'start.year': 2020,
267+
'end.day': 30, 'end.month': 1, 'end.year': 2020
268+
}
269+
}
270+
]
271+
}
253272
"""
254273
parent_stream = self.parent_stream(config=self.config)
255274
stream_state = stream_state or {self.cursor_field: self.config.get("start_date")}
@@ -260,7 +279,7 @@ def stream_slices(
260279
for fields_set in self.chunk_analytics_fields():
261280
base_slice["fields"] = ",".join(fields_set)
262281
date_slice_with_fields.append(base_slice | date_slice)
263-
yield date_slice_with_fields
282+
yield {"field_date_chunks": date_slice_with_fields}
264283

265284
@staticmethod
266285
def get_date_slices(start_date: str, end_date: str = None, window_in_days: int = WINDOW_IN_DAYS) -> Iterable[Mapping[str, Any]]:
@@ -307,7 +326,7 @@ def read_records(
307326
self, stream_state: Mapping[str, Any] = None, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs
308327
) -> Iterable[Mapping[str, Any]]:
309328
merged_records = defaultdict(dict)
310-
for field_slice in stream_slice:
329+
for field_slice in stream_slice.get("field_date_chunks", []):
311330
for rec in super().read_records(stream_slice=field_slice, **kwargs):
312331
merged_records[f"{rec[self.cursor_field]}-{rec['pivotValues']}"].update(rec)
313332
yield from merged_records.values()

airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/streams.py

+46-21
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
logger = logging.getLogger("airbyte")
1919

20-
LINKEDIN_VERSION_API = "202305"
20+
LINKEDIN_VERSION_API = "202404"
2121

2222

2323
class LinkedinAdsStream(HttpStream, ABC):
@@ -64,26 +64,13 @@ def path(
6464

6565
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
6666
"""
67-
To paginate through results, begin with a start value of 0 and a count value of N.
68-
To get the next page, set start value to N, while the count value stays the same.
69-
We have reached the end of the dataset when the response contains fewer elements than the `count` parameter request.
70-
https://docs.microsoft.com/en-us/linkedin/shared/api-guide/concepts/pagination?context=linkedin/marketing/context
67+
Cursor based pagination using the pageSize and pageToken parameters.
7168
"""
7269
parsed_response = response.json()
73-
is_elements_less_than_limit = len(parsed_response.get("elements")) < self.records_limit
74-
75-
# Note: The API might return fewer records than requested within the limits during pagination.
76-
# This behavior is documented at: https://github.com/airbytehq/airbyte/issues/34164
77-
paging_params = parsed_response.get("paging", {})
78-
is_end_of_records = (
79-
paging_params["total"] - paging_params["start"] <= self.records_limit
80-
if all(param in paging_params for param in ("total", "start"))
81-
else True
82-
)
83-
84-
if is_elements_less_than_limit and is_end_of_records:
70+
if parsed_response.get("metadata", {}).get("nextPageToken"):
71+
return {"pageToken": parsed_response["metadata"]["nextPageToken"]}
72+
else:
8573
return None
86-
return {"start": paging_params.get("start") + self.records_limit}
8774

8875
def request_headers(
8976
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
@@ -96,7 +83,7 @@ def request_params(
9683
stream_slice: Mapping[str, Any] = None,
9784
next_page_token: Mapping[str, Any] = None,
9885
) -> MutableMapping[str, Any]:
99-
params = {"count": self.records_limit, "q": "search"}
86+
params = {"pageSize": self.records_limit, "q": "search"}
10087
if next_page_token:
10188
params.update(**next_page_token)
10289
return params
@@ -130,6 +117,44 @@ def should_retry(self, response: requests.Response) -> bool:
130117
return super().should_retry(response)
131118

132119

120+
class OffsetPaginationMixin:
121+
"""Mixin for offset based pagination for endpoints tha tdoesnt support cursor based pagination"""
122+
123+
def request_params(
124+
self,
125+
stream_state: Mapping[str, Any],
126+
stream_slice: Mapping[str, Any] = None,
127+
next_page_token: Mapping[str, Any] = None,
128+
) -> MutableMapping[str, Any]:
129+
params = {"count": self.records_limit, "q": "search"}
130+
if next_page_token:
131+
params.update(**next_page_token)
132+
return params
133+
134+
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
135+
"""
136+
To paginate through results, begin with a start value of 0 and a count value of N.
137+
To get the next page, set start value to N, while the count value stays the same.
138+
We have reached the end of the dataset when the response contains fewer elements than the `count` parameter request.
139+
https://docs.microsoft.com/en-us/linkedin/shared/api-guide/concepts/pagination?context=linkedin/marketing/context
140+
"""
141+
parsed_response = response.json()
142+
is_elements_less_than_limit = len(parsed_response.get("elements")) < self.records_limit
143+
144+
# Note: The API might return fewer records than requested within the limits during pagination.
145+
# This behavior is documented at: https://github.com/airbytehq/airbyte/issues/34164
146+
paging_params = parsed_response.get("paging", {})
147+
is_end_of_records = (
148+
paging_params["total"] - paging_params["start"] <= self.records_limit
149+
if all(param in paging_params for param in ("total", "start"))
150+
else True
151+
)
152+
153+
if is_elements_less_than_limit and is_end_of_records:
154+
return None
155+
return {"start": paging_params.get("start") + self.records_limit}
156+
157+
133158
class Accounts(LinkedinAdsStream):
134159
"""
135160
Get Accounts data. More info about LinkedIn Ads / Accounts:
@@ -227,7 +252,7 @@ def read_records(
227252
yield from self.filter_records_newer_than_state(stream_state=stream_state, records_slice=child_stream_slice)
228253

229254

230-
class AccountUsers(LinkedInAdsStreamSlicing):
255+
class AccountUsers(OffsetPaginationMixin, LinkedInAdsStreamSlicing):
231256
"""
232257
Get AccountUsers data using `account_id` slicing. More info about LinkedIn Ads / AccountUsers:
233258
https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-account-users?tabs=http&view=li-lms-2023-05#find-ad-account-users-by-accounts
@@ -365,7 +390,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
365390
return {self.cursor_field: max(latest_record.get(self.cursor_field), int(current_stream_state.get(self.cursor_field)))}
366391

367392

368-
class Conversions(LinkedInAdsStreamSlicing):
393+
class Conversions(OffsetPaginationMixin, LinkedInAdsStreamSlicing):
369394
"""
370395
Get Conversions data using `account_id` slicing.
371396
https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/conversion-tracking?view=li-lms-2023-05&tabs=curl#find-conversions-by-ad-account

0 commit comments

Comments
 (0)