Skip to content

Commit f865faa

Browse files
rebase
Co-authored-by: Thomas Chow <[email protected]>
1 parent 610c8fe commit f865faa

File tree

5 files changed

+14
-6
lines changed

5 files changed

+14
-6
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ case object BigQueryNative extends Format {
2323
dfw.load(bqFriendlyName)
2424
} else {
2525
dfw
26-
.option("filter", partitionFilters.trim.stripPrefix("(").stripSuffix(")"))
26+
.option("filter", partitionFilters)
2727
.load(bqFriendlyName)
2828
}
2929
}

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,17 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
114114
println(allParts)
115115
}
116116

117+
it should "integration testing bigquery partition pushdown" ignore {
118+
import spark.implicits._
119+
val iceberg = "data.checkouts_native"
120+
121+
val singleFilter = tableUtils.loadTable(iceberg, List("ds = '2023-11-30'"))
122+
val multiFilter = tableUtils.loadTable(iceberg, List("ds = '2023-11-30'", "ds = '2023-11-30'"))
123+
assertEquals(
124+
singleFilter.select("user_id", "ds").as[(String, String)].collect.toList,
125+
multiFilter.select("user_id", "ds").as[(String, String)].collect.toList)
126+
}
127+
117128
it should "integration testing formats" ignore {
118129
val externalTable = "default_iceberg.data.checkouts_parquet"
119130
val externalFormat = FormatProvider.from(spark).readFormat(externalTable)

spark/src/main/scala/ai/chronon/spark/Driver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ object Driver {
901901
// Create a JoinPartNode from the join part
902902
val joinPartNode = new JoinPartNode()
903903
.setJoinPart(joinPart)
904-
.setLeftSourceTable(JoinUtils.computeLeftSourceTableName(join))
904+
.setLeftSourceTable(JoinUtils.computeFullLeftSourceTableName(join))
905905
.setLeftDataModel(join.left.dataModel)
906906
.setSkewKeys(join.skewKeys)
907907

spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package ai.chronon.spark.batch
22

3-
import ai.chronon.api
4-
import ai.chronon.api.{Accuracy, Constants, DateRange, JoinPart, PartitionRange, PartitionSpec}
5-
import ai.chronon.api.DataModel
63
import ai.chronon.api.DataModel.{ENTITIES, EVENTS}
74
import ai.chronon.api.Extensions.{DateRangeOps, DerivationOps, GroupByOps, JoinPartOps, MetadataOps}
85
import ai.chronon.api.PartitionRange.toTimeRange

spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class MergeJob(node: JoinMergeNode, range: DateRange, joinParts: Seq[JoinPart])(
3030
private val leftInputTable = if (join.bootstrapParts != null || join.onlineExternalParts != null) {
3131
join.metaData.bootstrapTable
3232
} else {
33-
JoinUtils.computeLeftSourceTableName(join)
33+
JoinUtils.computeFullLeftSourceTableName(join)
3434
}
3535
// Use the node's Join's metadata for output table
3636
private val outputTable = node.metaData.outputTable

0 commit comments

Comments
 (0)