Skip to content

Commit 851a817

Browse files
authored
Add back implementation for OpenLineage extractor for BigQueryInsertJobAsync Operator (#335)
* Revert "Revert `BigQueryAsyncExtractor` to release 1.3.0 (#332)" This reverts commit 39102d0. * Address Kaxil's comments on PR #290 * Delay fetching of hook's client * Use operator hook's client for extractor Remove fallback to default Client() * List openlineage extra in READMe.rst
1 parent 22cbb83 commit 851a817

File tree

6 files changed

+267
-2
lines changed

6 files changed

+267
-2
lines changed

README.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ Extras
9191
- ``pip install 'astronomer-providers[microsoft.azure]'``
9292
- Microsoft Azure
9393

94+
* - ``openlineage``
95+
- ``pip install 'astronomer-providers[openlineage]'``
96+
- Openlineage
97+
9498
* - ``snowflake``
9599
- ``pip install 'astronomer-providers[snowflake]'``
96100
- Snowflake

astronomer/providers/google/cloud/extractors/__init__.py

Whitespace-only changes.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from typing import Any, List, Optional
2+
3+
from airflow.exceptions import AirflowException
4+
from airflow.models.taskinstance import TaskInstance
5+
from airflow.utils.log.logging_mixin import LoggingMixin
6+
from google.cloud.bigquery import Client
7+
from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
8+
from openlineage.airflow.utils import get_job_name
9+
from openlineage.common.provider.bigquery import BigQueryDatasetsProvider
10+
11+
from astronomer.providers.google.cloud.operators.bigquery import (
12+
BigQueryInsertJobOperatorAsync,
13+
)
14+
15+
16+
class BigQueryAsyncExtractor(BaseExtractor, LoggingMixin):
17+
"""
18+
This extractor provides visibility on the metadata of a BigQuery Insert Job
19+
including ``billedBytes``, ``rowCount``, ``size``, etc. submitted from a
20+
``BigQueryInsertJobOperatorAsync`` operator.
21+
"""
22+
23+
def __init__(self, operator: BigQueryInsertJobOperatorAsync):
24+
super().__init__(operator)
25+
26+
def _get_big_query_client(self) -> Client:
27+
"""
28+
Gets the BigQuery client to fetch job metadata.
29+
The method checks whether a connection hook is available with the Airflow configuration for the operator, and
30+
if yes, returns the same connection. Otherwise, returns the Client instance of``google.cloud.bigquery``.
31+
"""
32+
hook = self.operator.hook
33+
return hook.get_client(project_id=hook.project_id, location=hook.location)
34+
35+
def _get_xcom_bigquery_job_id(self, task_instance: TaskInstance) -> Any:
36+
"""
37+
Pulls the BigQuery Job ID from XCOM for the task instance whose metadata needs to be extracted.
38+
39+
:param task_instance: Instance of the Airflow task whose BigQuery ``job_id`` needs to be pulled from XCOM.
40+
"""
41+
bigquery_job_id = task_instance.xcom_pull(task_ids=task_instance.task_id, key="job_id")
42+
if not bigquery_job_id:
43+
raise AirflowException("Could not pull relevant BigQuery job ID from XCOM")
44+
self.log.debug("Big Query Job Id %s", bigquery_job_id)
45+
return bigquery_job_id
46+
47+
@classmethod
48+
def get_operator_classnames(cls) -> List[str]:
49+
"""Returns the list of operators this extractor works on."""
50+
return ["BigQueryInsertJobOperatorAsync"]
51+
52+
def extract(self) -> Optional[TaskMetadata]:
53+
"""Empty extract implementation for the abstractmethod of the ``BaseExtractor`` class."""
54+
return None
55+
56+
def extract_on_complete(self, task_instance: TaskInstance) -> Optional[TaskMetadata]:
57+
"""
58+
Callback on task completion to fetch metadata extraction details that are to be pushed to the Lineage server.
59+
60+
:param task_instance: Instance of the Airflow task whose metadata needs to be extracted.
61+
"""
62+
try:
63+
bigquery_job_id = self._get_xcom_bigquery_job_id(task_instance)
64+
except AirflowException as ae:
65+
exception_message = str(ae)
66+
self.log.exception("%s", exception_message)
67+
return TaskMetadata(name=get_job_name(task=self.operator))
68+
69+
# We want to use the operator hook's client to fetch metadata details from remote Google cloud services.
70+
# The hook is attached to the operator during its initialization for execution. Hence, to reuse the hook's
71+
# client we want to delay referencing of the client up until here and not do it in the constructor itself
72+
# which would be called while the operator is still executing and the hook might not have been attached yet.
73+
self._big_query_client = self._get_big_query_client()
74+
75+
stats = BigQueryDatasetsProvider(client=self._big_query_client).get_facets(bigquery_job_id)
76+
inputs = stats.inputs
77+
output = stats.output
78+
run_facets = stats.run_facets
79+
80+
return TaskMetadata(
81+
name=get_job_name(task=self.operator),
82+
inputs=[ds.to_openlineage_dataset() for ds in inputs],
83+
outputs=[output.to_openlineage_dataset()] if output else [],
84+
run_facets=run_facets,
85+
)

dev/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ x-airflow-common:
2525
OPENLINEAGE_API_KEY: <YOUR_OPENLINEAGE_API_KEY>
2626
OPENLINEAGE_NAMESPACE: dev
2727
# yamllint disable-line rule:line-length
28-
OPENLINEAGE_EXTRACTOR_BigQueryAsync: astronomer.providers.google.cloud.extractors.bigquery_async_extractor.BigQueryAsyncExtractor
28+
OPENLINEAGE_EXTRACTORS: astronomer.providers.google.cloud.extractors.bigquery_async_extractor.BigQueryAsyncExtractor
2929
volumes:
3030
- ./dags:/usr/local/airflow/dags
3131
- ./logs:/usr/local/airflow/logs

setup.cfg

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ packages = find_namespace:
3434
include_package_data = true
3535
namespace_packages = astronomer,astronomer.providers
3636
install_requires =
37-
apache-airflow>=2.2.0
37+
apache-airflow>=2.3.0
3838
apache-airflow-providers-http
3939
aiohttp
4040
aiofiles
@@ -77,6 +77,10 @@ apache.hive =
7777
impyla
7878
microsoft.azure =
7979
apache-airflow-providers-microsoft-azure
80+
81+
# If in future we move Openlineage extractors out of the repo, this dependency should be removed
82+
openlineage =
83+
openlineage-airflow>=0.8.1
8084
docs =
8185
sphinx
8286
sphinx-autoapi
@@ -129,6 +133,7 @@ all =
129133
kubernetes_asyncio
130134
paramiko
131135
impyla
136+
openlineage-airflow>=0.8.1
132137

133138
[options.packages.find]
134139
include =
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import json
2+
from unittest import mock
3+
from unittest.mock import MagicMock
4+
5+
import pytest
6+
from airflow.exceptions import TaskDeferred
7+
from airflow.models.dagrun import DagRun
8+
from airflow.models.taskinstance import TaskInstance
9+
from airflow.utils.timezone import datetime
10+
from airflow.utils.types import DagRunType
11+
from openlineage.client.facet import OutputStatisticsOutputDatasetFacet
12+
from openlineage.common.dataset import Dataset, Source
13+
from openlineage.common.provider.bigquery import (
14+
BigQueryFacets,
15+
BigQueryJobRunFacet,
16+
BigQueryStatisticsDatasetFacet,
17+
)
18+
19+
from astronomer.providers.google.cloud.extractors.bigquery_async_extractor import (
20+
BigQueryAsyncExtractor,
21+
)
22+
from astronomer.providers.google.cloud.operators.bigquery import (
23+
BigQueryInsertJobOperatorAsync,
24+
)
25+
26+
TEST_DATASET_LOCATION = "EU"
27+
TEST_GCP_PROJECT_ID = "test-project"
28+
TEST_DATASET = "test-dataset"
29+
TEST_TABLE = "test-table"
30+
EXECUTION_DATE = datetime(2022, 1, 1, 0, 0, 0)
31+
INSERT_DATE = EXECUTION_DATE.strftime("%Y-%m-%d")
32+
INSERT_ROWS_QUERY = (
33+
f"INSERT {TEST_DATASET}.{TEST_TABLE} VALUES "
34+
f"(42, 'monthy python', '{INSERT_DATE}'), "
35+
f"(42, 'fishy fish', '{INSERT_DATE}');"
36+
)
37+
38+
INPUT_STATS = [
39+
Dataset(
40+
source=Source(scheme="bigquery"),
41+
name=f"astronomer-airflow-providers.{TEST_DATASET}.{TEST_TABLE}",
42+
fields=[],
43+
custom_facets={},
44+
input_facets={},
45+
output_facets={},
46+
)
47+
]
48+
49+
OUTPUT_STATS = Dataset(
50+
source=Source(scheme="bigquery"),
51+
name=f"astronomer-airflow-providers.{TEST_DATASET}.{TEST_TABLE}",
52+
fields=[],
53+
custom_facets={"stats": BigQueryStatisticsDatasetFacet(rowCount=2, size=0)},
54+
input_facets={},
55+
output_facets={"outputStatistics": OutputStatisticsOutputDatasetFacet(rowCount=2, size=0)},
56+
)
57+
58+
with open("tests/google/cloud/extractors/job_details.json") as jd_json:
59+
JOB_PROPERTIES = json.load(jd_json)
60+
61+
RUN_FACETS = {
62+
"bigQuery_job": BigQueryJobRunFacet(billedBytes=0, cached=False, properties=json.dumps(JOB_PROPERTIES))
63+
}
64+
65+
66+
@pytest.fixture
67+
def context():
68+
"""
69+
Creates an empty context.
70+
"""
71+
context = {}
72+
yield context
73+
74+
75+
@mock.patch("astronomer.providers.google.cloud.operators.bigquery._BigQueryHook")
76+
@mock.patch("airflow.models.TaskInstance.xcom_pull")
77+
@mock.patch("openlineage.common.provider.bigquery.BigQueryDatasetsProvider.get_facets")
78+
def test_extract_on_complete(mock_bg_dataset_provider, mock_xcom_pull, mock_hook):
79+
"""
80+
Tests that the custom extractor's implementation for the BigQueryInsertJobOperatorAsync is able to process the
81+
operator's metadata that needs to be extracted as per OpenLineage.
82+
"""
83+
configuration = {
84+
"query": {
85+
"query": INSERT_ROWS_QUERY,
86+
"useLegacySql": False,
87+
}
88+
}
89+
job_id = "123456"
90+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=job_id, error_result=False)
91+
mock_bg_dataset_provider.return_value = BigQueryFacets(
92+
run_facets=RUN_FACETS, inputs=INPUT_STATS, output=OUTPUT_STATS
93+
)
94+
95+
task_id = "insert_query_job"
96+
operator = BigQueryInsertJobOperatorAsync(
97+
task_id=task_id,
98+
configuration=configuration,
99+
location=TEST_DATASET_LOCATION,
100+
job_id=job_id,
101+
project_id=TEST_GCP_PROJECT_ID,
102+
)
103+
104+
task_instance = TaskInstance(task=operator)
105+
with pytest.raises(TaskDeferred):
106+
operator.execute(context)
107+
108+
bq_extractor = BigQueryAsyncExtractor(operator)
109+
task_meta_extract = bq_extractor.extract()
110+
assert task_meta_extract is None
111+
112+
task_meta = bq_extractor.extract_on_complete(task_instance)
113+
114+
mock_xcom_pull.assert_called_once_with(task_ids=task_instance.task_id, key="job_id")
115+
116+
assert task_meta.name == f"adhoc_airflow.{task_id}"
117+
118+
assert task_meta.inputs[0].facets["dataSource"].name == INPUT_STATS[0].source.scheme
119+
assert task_meta.inputs[0].name == INPUT_STATS[0].name
120+
121+
assert task_meta.outputs[0].name == OUTPUT_STATS.name
122+
assert task_meta.outputs[0].facets["stats"].rowCount == 2
123+
assert task_meta.outputs[0].facets["stats"].size == 0
124+
125+
assert task_meta.run_facets["bigQuery_job"].billedBytes == 0
126+
run_facet_properties = json.loads(task_meta.run_facets["bigQuery_job"].properties)
127+
assert run_facet_properties == JOB_PROPERTIES
128+
129+
130+
def test_extractor_works_on_operator():
131+
"""Tests that the custom extractor implementation is available for the BigQueryInsertJobOperatorAsync Operator."""
132+
task_id = "insert_query_job"
133+
operator = BigQueryInsertJobOperatorAsync(task_id=task_id, configuration={})
134+
assert type(operator).__name__ in BigQueryAsyncExtractor.get_operator_classnames()
135+
136+
137+
@mock.patch("astronomer.providers.google.cloud.operators.bigquery._BigQueryHook")
138+
def test_unavailable_xcom_raises_exception(mock_hook):
139+
"""
140+
Tests that an exception is raised when the custom extractor is not available to retrieve required XCOM for the
141+
BigQueryInsertJobOperatorAsync Operator.
142+
"""
143+
configuration = {
144+
"query": {
145+
"query": INSERT_ROWS_QUERY,
146+
"useLegacySql": False,
147+
}
148+
}
149+
job_id = "123456"
150+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=job_id, error_result=False)
151+
task_id = "insert_query_job"
152+
operator = BigQueryInsertJobOperatorAsync(
153+
task_id=task_id,
154+
configuration=configuration,
155+
location=TEST_DATASET_LOCATION,
156+
job_id=job_id,
157+
project_id=TEST_GCP_PROJECT_ID,
158+
)
159+
160+
task_instance = TaskInstance(task=operator)
161+
execution_date = datetime(2022, 1, 1, 0, 0, 0)
162+
task_instance.run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
163+
164+
with pytest.raises(TaskDeferred):
165+
operator.execute(context)
166+
bq_extractor = BigQueryAsyncExtractor(operator)
167+
with mock.patch.object(bq_extractor.log, "exception") as mock_log_exception:
168+
task_meta = bq_extractor.extract_on_complete(task_instance)
169+
170+
mock_log_exception.assert_called_with("%s", "Could not pull relevant BigQuery job ID from XCOM")
171+
assert task_meta.name == f"adhoc_airflow.{task_id}"

0 commit comments

Comments
 (0)