Skip to content

Commit 263a029

Browse files
varant-zlaiezvztchow-zlaithomaschow
authored
[airflow] -- add dependencies for airflow to customJson (#648)
## Summary Setting airflow dependencies in customJson. This is meant to be temporary, until we can ship orchestrator. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a new utility for defining staging queries with explicit table dependencies and partition columns. - Added support for specifying engine type, scheduling, and advanced metadata when creating staging queries. - Added helper functions to automatically generate and set Airflow dependency metadata for Chronon objects. - **Refactor** - Updated sample staging query definitions to use the new dependency and metadata structure, improving clarity and consistency. - Replaced nested metadata objects with direct keyword arguments for easier configuration. - Integrated automatic setting of Airflow dependencies during configuration parsing. - **Chores** - Enhanced internal handling of Airflow dependencies for relevant objects, ensuring accurate dependency tracking. - Updated team configurations to include default partition column settings for improved environment consistency. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: ezvz <[email protected]> Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
1 parent c61aa3b commit 263a029

File tree

7 files changed

+314
-23
lines changed

7 files changed

+314
-23
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
2+
import json
3+
4+
import ai.chronon.utils as utils
5+
from ai.chronon.api.ttypes import GroupBy, Join
6+
7+
8+
def create_airflow_dependency(table, partition_column):
9+
"""
10+
Create an Airflow dependency object for a table.
11+
12+
Args:
13+
table: The table name (with namespace)
14+
partition_column: The partition column to use (defaults to 'ds')
15+
16+
Returns:
17+
A dictionary with name and spec for the Airflow dependency
18+
"""
19+
assert partition_column is not None, """Partition column must be provided via the spark.chronon.partition.column
20+
config. This can be set as a default in teams.py, or at the individual config level. For example:
21+
```
22+
Team(
23+
conf=ConfigProperties(
24+
common={
25+
"spark.chronon.partition.column": "_test_column",
26+
}
27+
)
28+
)
29+
```
30+
"""
31+
return {
32+
"name": f"wf_{utils.sanitize(table)}",
33+
"spec": f"{table}/{partition_column}={{{{ ds }}}}",
34+
}
35+
36+
def _get_partition_col_from_query(query):
37+
"""Gets partition column from query if available"""
38+
if query:
39+
return query.partitionColumn
40+
return None
41+
42+
def _get_airflow_deps_from_source(source, partition_column=None):
43+
"""
44+
Given a source, return a list of Airflow dependencies.
45+
46+
Args:
47+
source: The source object (events, entities, or joinSource)
48+
partition_column: The partition column to use
49+
50+
Returns:
51+
A list of Airflow dependency objects
52+
"""
53+
tables = []
54+
# Assumes source has already been normalized
55+
if source.events:
56+
tables = [source.events.table]
57+
# Use partition column from query if available, otherwise use the provided one
58+
source_partition_column = _get_partition_col_from_query(source.events.query) or partition_column
59+
elif source.entities:
60+
# Given the setup of Query, we currently mandate the same partition column for snapshot and mutations tables
61+
tables = [source.entities.snapshotTable]
62+
if source.entities.mutationTable:
63+
tables.append(source.entities.mutationTable)
64+
source_partition_column = _get_partition_col_from_query(source.entities.query) or partition_column
65+
elif source.joinSource:
66+
# TODO: Handle joinSource -- it doesn't work right now because the metadata isn't set on joinSource at this point
67+
return []
68+
else:
69+
# Unknown source type
70+
return []
71+
72+
return [create_airflow_dependency(table, source_partition_column) for table in tables]
73+
74+
75+
def extract_default_partition_column(obj):
76+
return obj.metaData.executionInfo.conf.common.get("spark.chronon.partition.column")
77+
78+
79+
def _set_join_deps(join):
80+
default_partition_col = extract_default_partition_column(join)
81+
82+
deps = []
83+
84+
# Handle left source
85+
left_query = utils.get_query(join.left)
86+
left_partition_column = _get_partition_col_from_query(left_query) or default_partition_col
87+
deps.extend(_get_airflow_deps_from_source(join.left, left_partition_column))
88+
89+
# Handle right parts (join parts)
90+
if join.joinParts:
91+
for join_part in join.joinParts:
92+
if join_part.groupBy and join_part.groupBy.sources:
93+
for source in join_part.groupBy.sources:
94+
source_query = utils.get_query(source)
95+
source_partition_column = _get_partition_col_from_query(source_query) or default_partition_col
96+
deps.extend(_get_airflow_deps_from_source(source, source_partition_column))
97+
98+
# Handle label parts
99+
if join.labelParts and join.labelParts.labels:
100+
for label_part in join.labelParts.labels:
101+
if label_part.groupBy and label_part.groupBy.sources:
102+
for source in label_part.groupBy.sources:
103+
source_query = utils.get_query(source)
104+
source_partition_column = _get_partition_col_from_query(source_query) or default_partition_col
105+
deps.extend(_get_airflow_deps_from_source(source, source_partition_column))
106+
107+
# Update the metadata customJson with dependencies
108+
_set_airflow_deps_json(join, deps)
109+
110+
111+
def _set_group_by_deps(group_by):
112+
if not group_by.sources:
113+
return
114+
115+
default_partition_col = extract_default_partition_column(group_by)
116+
117+
deps = []
118+
119+
# Process each source in the group_by
120+
for source in group_by.sources:
121+
source_query = utils.get_query(source)
122+
source_partition_column = _get_partition_col_from_query(source_query) or default_partition_col
123+
deps.extend(_get_airflow_deps_from_source(source, source_partition_column))
124+
125+
# Update the metadata customJson with dependencies
126+
_set_airflow_deps_json(group_by, deps)
127+
128+
129+
def _set_airflow_deps_json(obj, deps):
130+
existing_json = obj.metaData.customJson or "{}"
131+
json_map = json.loads(existing_json)
132+
json_map["airflowDependencies"] = deps
133+
obj.metaData.customJson = json.dumps(json_map)
134+
135+
def set_airflow_deps(obj):
136+
"""
137+
Set Airflow dependencies for a Chronon object.
138+
139+
Args:
140+
obj: A Join, GroupBy
141+
"""
142+
# StagingQuery dependency setting is handled directly in object init
143+
if isinstance(obj, Join):
144+
_set_join_deps(obj)
145+
elif isinstance(obj, GroupBy):
146+
_set_group_by_deps(obj)

api/python/ai/chronon/cli/compile/parse_configs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
from typing import List
66

7+
from ai.chronon import airflow_helpers
78
from ai.chronon.cli.compile import parse_teams, serializer
89
from ai.chronon.cli.compile.compile_context import CompileContext
910
from ai.chronon.cli.compile.display.compiled_obj import CompiledObj
@@ -30,6 +31,9 @@ def from_folder(
3031

3132
for name, obj in results_dict.items():
3233
parse_teams.update_metadata(obj, compile_context.teams_dict)
34+
# Airflow deps must be set AFTER updating metadata
35+
airflow_helpers.set_airflow_deps(obj)
36+
3337
obj.metaData.sourceFile = os.path.relpath(f, compile_context.chronon_root)
3438

3539
tjson = serializer.thrift_simple_json(obj)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
2+
import inspect
3+
import json
4+
from dataclasses import dataclass
5+
from typing import Dict, List, Optional
6+
7+
import ai.chronon.airflow_helpers as airflow_helpers
8+
import ai.chronon.api.common.ttypes as common
9+
import ai.chronon.api.ttypes as ttypes
10+
11+
12+
# Wrapper for EngineType
13+
class EngineType:
14+
SPARK = ttypes.EngineType.SPARK
15+
BIGQUERY = ttypes.EngineType.BIGQUERY
16+
17+
@dataclass
18+
class TableDependency:
19+
table: str
20+
partition_column: Optional[str] = None
21+
22+
def StagingQuery(
23+
name: str,
24+
query: str,
25+
output_namespace: Optional[str] = None,
26+
start_partition: Optional[str] = None,
27+
table_properties: Optional[Dict[str, str]] = None,
28+
setups: Optional[List[str]] = None,
29+
partition_column: Optional[str] = None,
30+
engine_type: Optional[EngineType] = None,
31+
dependencies: Optional[List[TableDependency]] = None,
32+
tags: Optional[Dict[str, str]] = None,
33+
# execution params
34+
offline_schedule: str = "@daily",
35+
conf: Optional[common.ConfigProperties] = None,
36+
env_vars: Optional[common.EnvironmentVariables] = None,
37+
step_days: Optional[int] = None,
38+
) -> ttypes.StagingQuery:
39+
"""
40+
Creates a StagingQuery object for executing arbitrary SQL queries with templated date parameters.
41+
42+
:param query:
43+
Arbitrary spark query that should be written with template parameters:
44+
- `{{ start_date }}`: Initial run uses start_partition, future runs use latest partition + 1 day
45+
- `{{ end_date }}`: The end partition of the computing range
46+
- `{{ latest_date }}`: End partition independent of the computing range (for cumulative sources)
47+
- `{{ max_date(table=namespace.my_table) }}`: Max partition available for a given table
48+
These parameters can be modified with offset and bounds:
49+
- `{{ start_date(offset=-10, lower_bound='2023-01-01', upper_bound='2024-01-01') }}`
50+
:type query: str
51+
:param start_partition:
52+
On the first run, `{{ start_date }}` will be set to this user provided start date,
53+
future incremental runs will set it to the latest existing partition + 1 day.
54+
:type start_partition: str
55+
:param setups:
56+
Spark SQL setup statements. Used typically to register UDFs.
57+
:type setups: List[str]
58+
:param partition_column:
59+
Only needed for `max_date` template
60+
:type partition_column: str
61+
:param engine_type:
62+
By default, spark is the compute engine. You can specify an override (eg. bigquery, etc.)
63+
Use the EngineType class constants: EngineType.SPARK, EngineType.BIGQUERY, etc.
64+
:type engine_type: int
65+
:param tags:
66+
Additional metadata that does not directly affect computation, but is useful for management.
67+
:type tags: Dict[str, str]
68+
:param offline_schedule:
69+
The offline schedule interval for batch jobs. Format examples:
70+
'@hourly': '0 * * * *',
71+
'@daily': '0 0 * * *',
72+
'@weekly': '0 0 * * 0',
73+
'@monthly': '0 0 1 * *',
74+
'@yearly': '0 0 1 1 *'
75+
:type offline_schedule: str
76+
:param conf:
77+
Configuration properties for the StagingQuery.
78+
:type conf: common.ConfigProperties
79+
:param env_vars:
80+
Environment variables for the StagingQuery.
81+
:type env_vars: common.EnvironmentVariables
82+
:param step_days:
83+
The maximum number of days to process at once
84+
:type step_days: int
85+
:return:
86+
A StagingQuery object
87+
"""
88+
# Get caller's filename to assign team
89+
team = inspect.stack()[1].filename.split("/")[-2]
90+
91+
# Create execution info
92+
exec_info = common.ExecutionInfo(
93+
scheduleCron=offline_schedule,
94+
conf=conf,
95+
env=env_vars,
96+
stepDays=step_days,
97+
)
98+
99+
airflow_dependencies = [airflow_helpers.create_airflow_dependency(t.table, t.partition_column) for t in dependencies] if dependencies else []
100+
custom_json = json.dumps({"airflow_dependencies": airflow_dependencies})
101+
102+
# Create metadata
103+
meta_data = ttypes.MetaData(
104+
name=name,
105+
outputNamespace=output_namespace,
106+
team=team,
107+
executionInfo=exec_info,
108+
tags=tags,
109+
customJson=custom_json,
110+
tableProperties=table_properties,
111+
)
112+
113+
# Create and return the StagingQuery object with camelCase parameter names
114+
staging_query = ttypes.StagingQuery(
115+
metaData=meta_data,
116+
query=query,
117+
startPartition=start_partition,
118+
setups=setups,
119+
partitionColumn=partition_column,
120+
engineType=engine_type,
121+
)
122+
123+
return staging_query

api/python/test/sample/staging_queries/kaggle/outbrain.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
from ai.chronon.api.ttypes import MetaData, StagingQuery
16+
from ai.chronon.staging_query import StagingQuery, TableDependency
1717

1818
base_table = StagingQuery(
19+
name='outbrain_left',
1920
query="""
2021
SELECT
2122
clicks_train.display_id,
@@ -35,8 +36,9 @@
3536
AND ABS(HASH(clicks_train.display_id)) % 100 < 5
3637
AND ABS(HASH(events.display_id)) % 100 < 5
3738
""",
38-
metaData=MetaData(
39-
name='outbrain_left',
40-
outputNamespace="default",
41-
)
39+
output_namespace="default",
40+
dependencies=[
41+
TableDependency(table="kaggle_outbrain.clicks_train", partition_column="ds"),
42+
TableDependency(table="kaggle_outbrain.events", partition_column="ds")
43+
],
4244
)

api/python/test/sample/staging_queries/quickstart/checkouts_external.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from ai.chronon.api.ttypes import MetaData, StagingQuery
15+
from ai.chronon.staging_query import StagingQuery, TableDependency
1616

1717
query = """
1818
SELECT
@@ -30,11 +30,13 @@
3030
WHERE purchases.ds BETWEEN '{{ start_date }}' AND '{{ end_date }}'
3131
"""
3232

33-
staging_query = StagingQuery(
33+
checkouts_query = StagingQuery(
3434
query=query,
35-
startPartition="2023-10-31",
36-
metaData=MetaData(
37-
name='checkouts_staging_query',
38-
outputNamespace="data"
39-
),
35+
start_partition="2023-10-31",
36+
name='checkouts_staging_query',
37+
output_namespace="data",
38+
dependencies=[
39+
TableDependency(table="data.purchases", partition_column="ds"),
40+
TableDependency(table="data.checkouts_external", partition_column="ds")
41+
],
4042
)

api/python/test/sample/staging_queries/sample_team/sample_staging_query.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from ai.chronon.types import MetaData, StagingQuery
15+
from ai.chronon.staging_query import StagingQuery, TableDependency
1616

1717
query = """
1818
SELECT
@@ -28,15 +28,14 @@
2828

2929
v1 = StagingQuery(
3030
query=query,
31-
startPartition="2020-03-01",
31+
start_partition="2020-03-01",
3232
setups=[
3333
"CREATE TEMPORARY FUNCTION S2_CELL AS 'com.sample.hive.udf.S2CellId'",
3434
],
35-
metaData=MetaData(
36-
name="sample_staging_query",
37-
outputNamespace="sample_namespace",
38-
tableProperties={
39-
"sample_config_json": """{"sample_key": "sample value}""",
40-
},
41-
),
42-
)
35+
name="sample_staging_query",
36+
output_namespace="sample_namespace",
37+
table_properties={"sample_config_json": """{"sample_key": "sample value}"""},
38+
dependencies=[
39+
TableDependency(table="sample_namespace.sample_table", partition_column="ds")
40+
],
41+
)

0 commit comments

Comments
 (0)