Skip to content

Update DV360 operators to use API v2 #30326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions airflow/providers/google/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
Changelog
---------

8.12.1
......
9.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if our tooling will break when we get to 10 here. Let's see 🤔

.....

Breaking changes
~~~~~~~~~~~~~~~~

Google announced sunset of Bid manager API v1 and v1.1 by April 27, 2023 for more information
please check: `docs <https://developers.google.com/bid-manager/v1.1>`_ As a result default value of api_version in GoogleDisplayVideo360Hook and related operators updated to v2

This version of provider contains a temporary workaround to issue with ``v11`` version of
google-ads API being discontinued, while the google provider dependencies preventing installing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateQueryOperator,
GoogleDisplayVideo360CreateReportOperator,
GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadLineItemsOperator,
GoogleDisplayVideo360DownloadReportOperator,
GoogleDisplayVideo360DownloadReportV2Operator,
GoogleDisplayVideo360RunQueryOperator,
GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360SDFtoGCSOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360GetSDFDownloadOperationSensor,
GoogleDisplayVideo360ReportSensor,
GoogleDisplayVideo360RunQuerySensor,
)

# [START howto_display_video_env_variables]
Expand All @@ -50,7 +54,7 @@
PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt")
PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt")
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_5")
BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123)
ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
Expand All @@ -74,7 +78,25 @@
"schedule": {"frequency": "ONE_TIME"},
}

PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
REPORT_V2 = {
"metadata": {
"title": "Airflow Test Report",
"dataRange": {"range": "LAST_7_DAYS"},
"format": "CSV",
"sendNotification": False,
},
"params": {
"type": "STANDARD",
"groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
"filters": [{"type": "FILTER_PARTNER", "value": ADVERTISER_ID}],
"metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
},
"schedule": {"frequency": "ONE_TIME"},
}

PARAMETERS = {
"dataRange": {"range": "LAST_7_DAYS"},
}

CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: dict = {
"version": SDF_VERSION,
Expand Down Expand Up @@ -209,3 +231,46 @@

# Task dependency created via `XComArgs`:
# save_sdf_in_gcs >> upload_sdf_to_big_query

with models.DAG(
"example_display_video_v2",
start_date=START_DATE,
catchup=False,
) as dag:
# [START howto_google_display_video_create_query_operator]
create_query_v2 = GoogleDisplayVideo360CreateQueryOperator(body=REPORT_V2, task_id="create_query")

query_id = cast(str, XComArg(create_query_v2, key="query_id"))
# [END howto_google_display_video_create_query_operator]

# [START howto_google_display_video_run_query_report_operator]
run_query_v2 = GoogleDisplayVideo360RunQueryOperator(
query_id=query_id, parameters=PARAMETERS, task_id="run_report"
)

query_id = cast(str, XComArg(run_query_v2, key="query_id"))
report_id = cast(str, XComArg(run_query_v2, key="report_id"))
# [END howto_google_display_video_run_query_report_operator]

# [START howto_google_display_video_wait_run_query_sensor]
wait_for_query = GoogleDisplayVideo360RunQuerySensor(
task_id="wait_for_query",
query_id=query_id,
report_id=report_id,
)
# [END howto_google_display_video_wait_run_query_sensor]

# [START howto_google_display_video_get_report_operator]
get_report_v2 = GoogleDisplayVideo360DownloadReportV2Operator(
query_id=query_id,
report_id=report_id,
task_id="get_report",
bucket_name=BUCKET,
report_name="test1.csv",
)
# # [END howto_google_display_video_get_report_operator]
# # [START howto_google_display_video_delete_query_report_operator]
delete_report_v2 = GoogleDisplayVideo360DeleteReportOperator(report_id=report_id, task_id="delete_report")
# # [END howto_google_display_video_delete_query_report_operator]

create_query_v2 >> run_query_v2 >> wait_for_query >> get_report_v2 >> delete_report_v2
59 changes: 51 additions & 8 deletions airflow/providers/google/marketing_platform/hooks/display_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""This module contains Google DisplayVideo hook."""
from __future__ import annotations

import warnings
from typing import Any, Sequence

from googleapiclient.discovery import Resource, build
Expand All @@ -32,7 +33,7 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):

def __init__(
self,
api_version: str = "v1",
api_version: str = "v2",
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
Expand All @@ -42,6 +43,11 @@ def __init__(
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
if api_version in ["v1", "v1.1"]:
warnings.warn(
f"API {api_version} is deprecated and shortly will be removed please use v2",
DeprecationWarning,
)
self.api_version = api_version

def get_conn(self) -> Resource:
Expand Down Expand Up @@ -93,7 +99,10 @@ def create_query(self, query: dict[str, Any]) -> dict:

:param query: Query object to be passed to request body.
"""
response = self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
if self.api_version in ["v1", "v1.1"]:
response = self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
else:
response = self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
return response

def delete_query(self, query_id: str) -> None:
Expand All @@ -102,33 +111,67 @@ def delete_query(self, query_id: str) -> None:

:param query_id: Query ID to delete.
"""
(self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries))
if self.api_version in ["v1", "v1.1"]:
self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries)
else:
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)

def get_query(self, query_id: str) -> dict:
"""
Retrieves a stored query.

:param query_id: Query ID to retrieve.
"""
response = self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
if self.api_version in ["v1", "v1.1"]:
response = (
self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
)
else:
response = self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
return response

def list_queries(self) -> list[dict]:
"""Retrieves stored queries."""
response = self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
if self.api_version in ["v1", "v1.1"]:
response = self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
else:
response = self.get_conn().queries().list().execute(num_retries=self.num_retries)
return response.get("queries", [])

def run_query(self, query_id: str, params: dict[str, Any] | None) -> None:
def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
"""
Runs a stored query to generate a report.

:param query_id: Query ID to run.
:param params: Parameters for the report.
"""
(
if self.api_version in ["v1", "v1.1"]:
return (
self.get_conn()
.queries()
.runquery(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)
else:
return (
self.get_conn()
.queries()
.run(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)

def get_report(self, query_id: str, report_id: str) -> dict:
"""
Retrieves a report.

:param query_id: Query ID for which report was generated.
:param report_id: Report ID to retrieve.
"""
return (
self.get_conn()
.queries()
.runquery(queryId=query_id, body=params)
.reports()
.get(queryId=query_id, reportId=report_id)
.execute(num_retries=self.num_retries)
)

Expand Down
Loading