Skip to content

Commit f67d285

Browse files
feat: support unpartitioned tables (#724)
## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Improved handling of tables without partition columns to ensure smoother data loading. - The system now gracefully loads unpartitioned tables instead of raising errors. - **New Features** - Added new data sources and group-by configurations for enhanced purchase data aggregation. - Introduced environment-specific upload and deletion of additional BigQuery tables to support new group-by views. - **Bug Fixes** - Resolved issues where missing partition columns would previously cause exceptions, enhancing reliability for various table types. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: thomaschow <[email protected]>
1 parent 3f87290 commit f67d285

File tree

3 files changed

+120
-28
lines changed

3 files changed

+120
-28
lines changed

api/python/test/canary/group_bys/gcp/purchases.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,71 @@
1616
time_column="ts") # The event time
1717
))
1818

19+
view_source = Source(
20+
events=EventSource(
21+
table="data.purchases_native_view", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
22+
topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events
23+
query=Query(
24+
selects=selects("user_id","purchase_price"), # Select the fields we care about
25+
time_column="ts") # The event time
26+
))
27+
1928
window_sizes = [Window(length=day, time_unit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below
2029

30+
v1_view_dev = GroupBy(
31+
backfill_start_date="2023-11-01",
32+
sources=[view_source],
33+
keys=["user_id"], # We are aggregating by user
34+
online=True,
35+
aggregations=[Aggregation(
36+
input_column="purchase_price",
37+
operation=Operation.SUM,
38+
windows=window_sizes
39+
), # The sum of purchases prices in various windows
40+
Aggregation(
41+
input_column="purchase_price",
42+
operation=Operation.COUNT,
43+
windows=window_sizes
44+
), # The count of purchases in various windows
45+
Aggregation(
46+
input_column="purchase_price",
47+
operation=Operation.AVERAGE,
48+
windows=window_sizes
49+
), # The average purchases by user in various windows
50+
Aggregation(
51+
input_column="purchase_price",
52+
operation=Operation.LAST_K(10),
53+
),
54+
],
55+
)
56+
57+
v1_view_test = GroupBy(
58+
backfill_start_date="2023-11-01",
59+
sources=[view_source],
60+
keys=["user_id"], # We are aggregating by user
61+
online=True,
62+
aggregations=[Aggregation(
63+
input_column="purchase_price",
64+
operation=Operation.SUM,
65+
windows=window_sizes
66+
), # The sum of purchases prices in various windows
67+
Aggregation(
68+
input_column="purchase_price",
69+
operation=Operation.COUNT,
70+
windows=window_sizes
71+
), # The count of purchases in various windows
72+
Aggregation(
73+
input_column="purchase_price",
74+
operation=Operation.AVERAGE,
75+
windows=window_sizes
76+
), # The average purchases by user in various windows
77+
Aggregation(
78+
input_column="purchase_price",
79+
operation=Operation.LAST_K(10),
80+
),
81+
],
82+
)
83+
2184
v1_dev = GroupBy(
2285
backfill_start_date="2023-11-01",
2386
sources=[source],

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ case object BigQueryNative extends Format {
3434
|
3535
|""".stripMargin
3636

37-
val partColName = sparkSession.read
37+
val pColOption = sparkSession.read
3838
.format(bqFormat)
3939
.option("project", providedProject)
4040
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
@@ -45,37 +45,47 @@ case object BigQueryNative extends Format {
4545
.as[String]
4646
.collect
4747
.headOption
48-
.getOrElse(
49-
throw new UnsupportedOperationException(s"No partition column for table ${tableName} found.")
50-
) // TODO: support unpartitioned tables (uncommon case).
5148

52-
// Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter.
5349
val partitionWheres = if (partitionFilters.nonEmpty) s"WHERE ${partitionFilters}" else partitionFilters
54-
val partitionFormat = TableUtils(sparkSession).partitionFormat
55-
val select =
56-
s"SELECT distinct(${partColName}) AS ${internalBQPartitionCol} FROM ${bqFriendlyName} ${partitionWheres}"
57-
val selectedParts = sparkSession.read
58-
.format(bqFormat)
59-
.option("viewsEnabled", true)
60-
.option("materializationDataset", bqTableId.getDataset)
61-
.load(select)
62-
.select(date_format(col(internalBQPartitionCol), partitionFormat))
63-
.as[String]
64-
.collect
65-
.toList
66-
logger.info(s"Part values: ${selectedParts}")
67-
68-
// Finally, we query the BQ table for each of the selected partition values and union them together.
69-
selectedParts
70-
.map((partValue) => {
71-
val pFilter = f"${partColName} = '${partValue}'"
50+
pColOption match {
51+
case Some(partColName) => {
52+
// Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter.
53+
val partitionFormat = TableUtils(sparkSession).partitionFormat
54+
val select =
55+
s"SELECT distinct(${partColName}) AS ${internalBQPartitionCol} FROM ${bqFriendlyName} ${partitionWheres}"
56+
logger.info(s"Listing in scope BQ native table partitions: ${select}")
57+
val selectedParts = sparkSession.read
58+
.format(bqFormat)
59+
.option("viewsEnabled", true)
60+
.option("materializationDataset", bqTableId.getDataset)
61+
.load(select)
62+
.select(date_format(col(internalBQPartitionCol), partitionFormat))
63+
.as[String]
64+
.collect
65+
.toList
66+
logger.info(s"Part values: ${selectedParts}")
67+
68+
// Finally, we query the BQ table for each of the selected partition values and union them together.
69+
selectedParts
70+
.map((partValue) => {
71+
val pFilter = f"${partColName} = '${partValue}'"
72+
sparkSession.read
73+
.format(bqFormat)
74+
.option("filter", pFilter)
75+
.load(bqFriendlyName)
76+
.withColumn(partColName, lit(partValue))
77+
}) // todo: make it nullable
78+
.reduce(_ unionByName _)
79+
}
80+
case None =>
81+
val select = s"SELECT * FROM ${bqFriendlyName} ${partitionWheres}"
82+
logger.info(s"BQ Query: ${select}")
7283
sparkSession.read
84+
.option("viewsEnabled", true)
85+
.option("materializationDataset", bqTableId.getDataset)
7386
.format(bqFormat)
74-
.option("filter", pFilter)
75-
.load(bqFriendlyName)
76-
.withColumn(partColName, lit(partValue))
77-
}) // todo: make it nullable
78-
.reduce(_ unionByName _)
87+
.load(select)
88+
}
7989
}
8090

8191
override def primaryPartitions(tableName: String,

scripts/distribution/run_gcp_quickstart.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,11 @@ set -xo pipefail
6969
# Delete gcp tables to start from scratch
7070
if [[ "$ENVIRONMENT" == "canary" ]]; then
7171
bq rm -f -t canary-443022:data.gcp_purchases_v1_test
72+
bq rm -f -t canary-443022:data.gcp_purchases_v1_view_test
7273
bq rm -f -t canary-443022:data.gcp_purchases_v1_test_upload
7374
else
7475
bq rm -f -t canary-443022:data.gcp_purchases_v1_dev
76+
bq rm -f -t canary-443022:data.gcp_purchases_v1_view_dev
7577
bq rm -f -t canary-443022:data.gcp_purchases_v1_dev_upload
7678
fi
7779
#TODO: delete bigtable rows
@@ -132,6 +134,15 @@ fi
132134

133135
fail_if_bash_failed $?
134136

137+
echo -e "${GREEN}<<<<<.....................................BACKFILL-VIEW.....................................>>>>>\033[0m"
138+
if [[ "$ENVIRONMENT" == "canary" ]]; then
139+
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode backfill --conf compiled/group_bys/gcp/purchases.v1_view_test
140+
else
141+
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode backfill --conf compiled/group_bys/gcp/purchases.v1_view_dev
142+
fi
143+
144+
fail_if_bash_failed $?
145+
135146
echo -e "${GREEN}<<<<<.....................................CHECK-PARTITIONS.....................................>>>>>\033[0m"
136147
EXPECTED_PARTITION="2023-11-30"
137148
if [[ "$ENVIRONMENT" == "canary" ]]; then
@@ -149,6 +160,14 @@ else
149160
fi
150161
fail_if_bash_failed
151162

163+
echo -e "${GREEN}<<<<<.....................................GROUP-BY-UPLOAD.....................................>>>>>\033[0m"
164+
if [[ "$ENVIRONMENT" == "canary" ]]; then
165+
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode upload --conf compiled/group_bys/gcp/purchases.v1_view_test --ds 2023-12-01
166+
else
167+
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode upload --conf compiled/group_bys/gcp/purchases.v1_view_dev --ds 2023-12-01
168+
fi
169+
fail_if_bash_failed
170+
152171
# Need to wait for upload to finish
153172
echo -e "${GREEN}<<<<<.....................................UPLOAD-TO-KV.....................................>>>>>\033[0m"
154173
if [[ "$ENVIRONMENT" == "canary" ]]; then

0 commit comments

Comments
 (0)