Skip to content

Commit 0c5d495

Browse files
authored
Merge branch 'main' into vz/add_test_case_for_different_partition_formats
2 parents 74c8cb8 + b7b804d commit 0c5d495

File tree

4 files changed

+26
-5
lines changed

4 files changed

+26
-5
lines changed

api/python/ai/chronon/airflow_helpers.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def create_airflow_dependency(table, partition_column, additional_partitions=Non
1212
Args:
1313
table: The table name (with namespace)
1414
partition_column: The partition column to use (defaults to 'ds')
15+
additional_partitions: Additional partitions to include in the dependency
1516
1617
Returns:
1718
A dictionary with name and spec for the Airflow dependency
@@ -47,6 +48,12 @@ def _get_partition_col_from_query(query):
4748
return query.partitionColumn
4849
return None
4950

51+
def _get_additional_subPartitionsToWaitFor_from_query(query):
52+
"""Gets additional subPartitionsToWaitFor from query if available"""
53+
if query:
54+
return query.subPartitionsToWaitFor
55+
return None
56+
5057

5158
def _get_airflow_deps_from_source(source, partition_column=None):
5259
"""
@@ -60,20 +67,22 @@ def _get_airflow_deps_from_source(source, partition_column=None):
6067
A list of Airflow dependency objects
6168
"""
6269
tables = []
70+
additional_partitions = None
6371
# Assumes source has already been normalized
6472
if source.events:
6573
tables = [source.events.table]
6674
# Use partition column from query if available, otherwise use the provided one
67-
source_partition_column = (
68-
_get_partition_col_from_query(source.events.query) or partition_column
75+
source_partition_column, additional_partitions = (
76+
_get_partition_col_from_query(source.events.query) or partition_column, _get_additional_subPartitionsToWaitFor_from_query(source.events.query)
6977
)
78+
7079
elif source.entities:
7180
# Given the setup of Query, we currently mandate the same partition column for snapshot and mutations tables
7281
tables = [source.entities.snapshotTable]
7382
if source.entities.mutationTable:
7483
tables.append(source.entities.mutationTable)
75-
source_partition_column = (
76-
_get_partition_col_from_query(source.entities.query) or partition_column
84+
source_partition_column, additional_partitions = (
85+
_get_partition_col_from_query(source.entities.query) or partition_column, _get_additional_subPartitionsToWaitFor_from_query(source.entities.query)
7786
)
7887
elif source.joinSource:
7988
# TODO: Handle joinSource -- it doesn't work right now because the metadata isn't set on joinSource at this point
@@ -83,7 +92,7 @@ def _get_airflow_deps_from_source(source, partition_column=None):
8392
return []
8493

8594
return [
86-
create_airflow_dependency(table, source_partition_column) for table in tables
95+
create_airflow_dependency(table, source_partition_column, additional_partitions) for table in tables
8796
]
8897

8998

api/python/ai/chronon/group_by.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,7 @@ def _normalize_source(source):
648648
for output_col in get_output_col_names(agg):
649649
column_tags[output_col] = agg.tags
650650

651+
651652
metadata = ttypes.MetaData(
652653
online=online,
653654
production=production,

api/python/ai/chronon/query.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def Query(
2929
reversal_column: str = None,
3030
partition_column: str = None,
3131
partition_format: str = None,
32+
sub_partitions_to_wait_for: List[str] = None,
3233
) -> api.Query:
3334
"""
3435
Create a query object that is used to scan data from various data sources.
@@ -76,6 +77,9 @@ def Query(
7677
:param partition_column:
7778
Specify this to override spark.chronon.partition.column set in teams.py for this particular query.
7879
:type partition_column: str, optional
80+
:param sub_partitions_to_wait_for:
81+
Additional partitions to be used in sensing that the source data has landed. Should be a full partition string, such as `hr=23:00'
82+
:type partition_column: List[str], optional
7983
:param partition_format:
8084
Date format string to expect the partition values to be in.
8185
:type partition_format: str, optional
@@ -91,6 +95,7 @@ def Query(
9195
mutationTimeColumn=mutation_time_column,
9296
reversalColumn=reversal_column,
9397
partitionColumn=partition_column,
98+
subPartitionsToWaitFor=sub_partitions_to_wait_for,
9499
partitionFormat=partition_format
95100
)
96101

api/thrift/api.thrift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ struct Query {
4040
**/
4141
23: optional common.Window partitionLag
4242

43+
/**
44+
* Additional partitions to be used in sensing that the source data has landed.
45+
* Should be a full partition string, such as `hr=23:00'
46+
**/
47+
24: optional list<string> subPartitionsToWaitFor
48+
4349
}
4450

4551
/**

0 commit comments

Comments
 (0)