Skip to content

✨ Source Linkedin Ads: Update API version to 202404 #37573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 137ece28-5434-455c-8f34-69dc3782f451
dockerImageTag: 2.0.0
dockerImageTag: 2.1.0
dockerRepository: airbyte/source-linkedin-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/linkedin-ads
githubIssueLabel: source-linkedin-ads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.0"
version = "2.1.0"
name = "source-linkedin-ads"
description = "Source implementation for Linkedin Ads."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,38 @@ def get_primary_key_from_slice(self, stream_slice) -> str:

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
) -> Iterable[List[Mapping[str, Any]]]:
) -> Iterable[Optional[Mapping[str, List[Mapping[str, Any]]]]]:
"""
LinkedIn has a max of 20 fields per request. We make chunks by size of 19 fields to have the `dateRange` be included as well.
https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting?view=li-lms-2023-05&tabs=http#requesting-specific-metrics-in-the-analytics-finder

:param sync_mode:
:param cursor_field:
:param stream_state:
:return: Iterable with List of stream slices within the same date range and chunked fields, example
[{'campaign_id': 123, 'fields': 'field_1,field_2,dateRange', 'dateRange': {'start.day': 1, 'start.month': 1, 'start.year': 2020, 'end.day': 30, 'end.month': 1, 'end.year': 2020}},
{'campaign_id': 123, 'fields': 'field_2,field_3,dateRange', 'dateRange': {'start.day': 1, 'start.month': 1, 'start.year': 2020, 'end.day': 30, 'end.month': 1, 'end.year': 2020}},
{'campaign_id': 123, 'fields': 'field_4,field_5,dateRange', 'dateRange': {'start.day': 1, 'start.month': 1, 'start.year': 2020, 'end.day': 30, 'end.month': 1, 'end.year': 2020}}]

:return: An iterable of dictionaries, each containing a single key 'field_date_chunks'. The value under 'field_date_chunks' is
a list of dictionaries where each dictionary represents a slice of data defined by a specific date range and chunked fields.

Example of returned data:
{
'field_date_chunks': [
{
'campaign_id': 123,
'fields': 'field_1,field_2,dateRange',
'dateRange': {
'start.day': 1, 'start.month': 1, 'start.year': 2020,
'end.day': 30, 'end.month': 1, 'end.year': 2020
}
},
{
'campaign_id': 123,
'fields': 'field_3,field_4,dateRange',
'dateRange': {
'start.day': 1, 'start.month': 1, 'start.year': 2020,
'end.day': 30, 'end.month': 1, 'end.year': 2020
}
}
]
}
"""
parent_stream = self.parent_stream(config=self.config)
stream_state = stream_state or {self.cursor_field: self.config.get("start_date")}
Expand All @@ -260,7 +279,7 @@ def stream_slices(
for fields_set in self.chunk_analytics_fields():
base_slice["fields"] = ",".join(fields_set)
date_slice_with_fields.append(base_slice | date_slice)
yield date_slice_with_fields
yield {"field_date_chunks": date_slice_with_fields}

@staticmethod
def get_date_slices(start_date: str, end_date: str = None, window_in_days: int = WINDOW_IN_DAYS) -> Iterable[Mapping[str, Any]]:
Expand Down Expand Up @@ -307,7 +326,7 @@ def read_records(
self, stream_state: Mapping[str, Any] = None, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:
merged_records = defaultdict(dict)
for field_slice in stream_slice:
for field_slice in stream_slice.get("field_date_chunks", []):
for rec in super().read_records(stream_slice=field_slice, **kwargs):
merged_records[f"{rec[self.cursor_field]}-{rec['pivotValues']}"].update(rec)
yield from merged_records.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

logger = logging.getLogger("airbyte")

LINKEDIN_VERSION_API = "202305"
LINKEDIN_VERSION_API = "202404"


class LinkedinAdsStream(HttpStream, ABC):
Expand Down Expand Up @@ -64,26 +64,13 @@ def path(

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
To paginate through results, begin with a start value of 0 and a count value of N.
To get the next page, set start value to N, while the count value stays the same.
We have reached the end of the dataset when the response contains fewer elements than the `count` parameter request.
https://docs.microsoft.com/en-us/linkedin/shared/api-guide/concepts/pagination?context=linkedin/marketing/context
Cursor based pagination using the pageSize and pageToken parameters.
"""
parsed_response = response.json()
is_elements_less_than_limit = len(parsed_response.get("elements")) < self.records_limit

# Note: The API might return fewer records than requested within the limits during pagination.
# This behavior is documented at: https://github.com/airbytehq/airbyte/issues/34164
paging_params = parsed_response.get("paging", {})
is_end_of_records = (
paging_params["total"] - paging_params["start"] <= self.records_limit
if all(param in paging_params for param in ("total", "start"))
else True
)

if is_elements_less_than_limit and is_end_of_records:
if parsed_response.get("metadata", {}).get("nextPageToken"):
return {"pageToken": parsed_response["metadata"]["nextPageToken"]}
else:
return None
return {"start": paging_params.get("start") + self.records_limit}

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
Expand All @@ -96,7 +83,7 @@ def request_params(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = {"count": self.records_limit, "q": "search"}
params = {"pageSize": self.records_limit, "q": "search"}
if next_page_token:
params.update(**next_page_token)
return params
Expand Down Expand Up @@ -130,6 +117,44 @@ def should_retry(self, response: requests.Response) -> bool:
return super().should_retry(response)


class OffsetPaginationMixin:
"""Mixin for offset based pagination for endpoints tha tdoesnt support cursor based pagination"""

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = {"count": self.records_limit, "q": "search"}
if next_page_token:
params.update(**next_page_token)
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
To paginate through results, begin with a start value of 0 and a count value of N.
To get the next page, set start value to N, while the count value stays the same.
We have reached the end of the dataset when the response contains fewer elements than the `count` parameter request.
https://docs.microsoft.com/en-us/linkedin/shared/api-guide/concepts/pagination?context=linkedin/marketing/context
"""
parsed_response = response.json()
is_elements_less_than_limit = len(parsed_response.get("elements")) < self.records_limit

# Note: The API might return fewer records than requested within the limits during pagination.
# This behavior is documented at: https://github.com/airbytehq/airbyte/issues/34164
paging_params = parsed_response.get("paging", {})
is_end_of_records = (
paging_params["total"] - paging_params["start"] <= self.records_limit
if all(param in paging_params for param in ("total", "start"))
else True
)

if is_elements_less_than_limit and is_end_of_records:
return None
return {"start": paging_params.get("start") + self.records_limit}


class Accounts(LinkedinAdsStream):
"""
Get Accounts data. More info about LinkedIn Ads / Accounts:
Expand Down Expand Up @@ -227,7 +252,7 @@ def read_records(
yield from self.filter_records_newer_than_state(stream_state=stream_state, records_slice=child_stream_slice)


class AccountUsers(LinkedInAdsStreamSlicing):
class AccountUsers(OffsetPaginationMixin, LinkedInAdsStreamSlicing):
"""
Get AccountUsers data using `account_id` slicing. More info about LinkedIn Ads / AccountUsers:
https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-account-users?tabs=http&view=li-lms-2023-05#find-ad-account-users-by-accounts
Expand Down Expand Up @@ -365,7 +390,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: max(latest_record.get(self.cursor_field), int(current_stream_state.get(self.cursor_field)))}


class Conversions(LinkedInAdsStreamSlicing):
class Conversions(OffsetPaginationMixin, LinkedInAdsStreamSlicing):
"""
Get Conversions data using `account_id` slicing.
https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/conversion-tracking?view=li-lms-2023-05&tabs=curl#find-conversions-by-ad-account
Expand Down
Loading
Loading