Skip to content

Commit a90235f

Browse files
varant-zlaiezvz
andauthored
Add label join flag to custom json for airflow (#775)
## Summary Adding a flag so that airflow integration knows whether to schedule a join or not ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced join metadata to include a flag indicating the presence of label parts. - **Tests** - Updated sample join test to include label part information in join instantiation. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: ezvz <[email protected]>
1 parent 4db7dba commit a90235f

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

api/python/ai/chronon/airflow_helpers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ def _set_join_deps(join):
149149
# Update the metadata customJson with dependencies
150150
_dedupe_and_set_airflow_deps_json(join, deps)
151151

152+
# Set the t/f flag for label_join
153+
_set_label_join_flag(join)
154+
152155

153156
def _set_group_by_deps(group_by):
154157
if not group_by.sources:
@@ -170,6 +173,16 @@ def _set_group_by_deps(group_by):
170173
_dedupe_and_set_airflow_deps_json(group_by, deps)
171174

172175

176+
def _set_label_join_flag(join):
177+
existing_json = join.metaData.customJson or "{}"
178+
json_map = json.loads(existing_json)
179+
label_join_flag = False
180+
if join.labelParts:
181+
label_join_flag = True
182+
json_map["label_join"] = label_join_flag
183+
join.metaData.customJson = json.dumps(json_map)
184+
185+
173186
def _dedupe_and_set_airflow_deps_json(obj, deps):
174187
sorted_items = [tuple(sorted(d.items())) for d in deps]
175188
# Use OrderedDict for re-producible ordering of dependencies

api/python/test/sample/joins/sample_team/sample_join.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
JoinPart,
2121
)
2222
from ai.chronon.repo.constants import RunMode
23-
from ai.chronon.types import EnvironmentVariables
23+
from ai.chronon.types import EnvironmentVariables, LabelParts
2424

2525
v1 = Join(
2626
left=test_sources.staging_entities,
@@ -33,6 +33,7 @@
3333
}
3434
),
3535
online=True,
36+
label_part=LabelParts([JoinPart(group_by=sample_group_by.v1)], 1, 1)
3637
)
3738

3839
never = Join(

0 commit comments

Comments
 (0)