Skip to content

Commit 06f80eb

Browse files
committed
WIP
1 parent b4582d9 commit 06f80eb

File tree

2 files changed

+22
-9
lines changed

2 files changed

+22
-9
lines changed

api/python/ai/chronon/airflow_helpers.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,21 @@ def create_airflow_dependency(table, partition_column):
1616
Returns:
1717
A dictionary with name and spec for the Airflow dependency
1818
"""
19-
# Default partition column to 'ds' if not specified
20-
partition_col = partition_column or 'ds'
19+
assert partition_column is not None, """Partition column must be provided via the spark.chronon.partition.column
20+
config. This can be set as a default in teams.py, or at the individual config level. For example:
21+
```
22+
Team(
23+
conf=ConfigProperties(
24+
common={
25+
"spark.chronon.partition.column": "_test_column",
26+
}
27+
)
28+
)
29+
```
30+
"""
2131
return {
2232
"name": f"wf_{utils.sanitize(table)}",
23-
"spec": f"{table}/{partition_col}={{{{ ds }}}}",
33+
"spec": f"{table}/{partition_column}={{{{ ds }}}}",
2434
}
2535

2636
def _get_partition_col_from_query(query):
@@ -53,10 +63,8 @@ def _get_airflow_deps_from_source(source, partition_column=None):
5363
tables.append(source.entities.mutationTable)
5464
source_partition_column = _get_partition_col_from_query(source.entities.query) or partition_column
5565
elif source.joinSource:
56-
namespace = source.joinSource.join.metaData.outputNamespace
57-
table = utils.sanitize(source.joinSource.join.metaData.name)
58-
tables = [f"{namespace}.{table}"]
59-
source_partition_column = _get_partition_col_from_query(source.joinSource.query) or partition_column
66+
# TODO: Handle joinSource -- it doesn't work right now because the metadata isn't set on joinSource at this point
67+
return []
6068
else:
6169
# Unknown source type
6270
return []
@@ -65,7 +73,7 @@ def _get_airflow_deps_from_source(source, partition_column=None):
6573

6674

6775
def extract_default_partition_column(obj):
68-
return obj.metaData.executionInfo.env.common.get("partitionColumn")
76+
return obj.metaData.executionInfo.conf.common.get("spark.chronon.partition.column")
6977

7078

7179
def _set_join_deps(join):

api/python/test/sample/teams.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from ai.chronon.api.ttypes import Team
22
from ai.chronon.repo.constants import RunMode
3-
from ai.chronon.types import EnvironmentVariables, ConfigProperties
3+
from ai.chronon.types import ConfigProperties, EnvironmentVariables
44

55
default = Team(
66
description="Default team",
@@ -76,6 +76,11 @@
7676

7777
sample_team = Team(
7878
outputNamespace="test",
79+
conf=ConfigProperties(
80+
common={
81+
"spark.chronon.partition.column": "_test_column_sample",
82+
}
83+
),
7984
env=EnvironmentVariables(
8085
common={
8186
"GCP_BIGTABLE_INSTANCE_ID": "test-instance" # example, custom bigtable instance

0 commit comments

Comments
 (0)