Skip to content

Commit 6c17647

Browse files
committed
Switch to env var driven approach
1 parent fdd910e commit 6c17647

File tree

4 files changed

+159
-138
lines changed

4 files changed

+159
-138
lines changed

.circleci/config.yml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,35 @@ jobs:
7878
destination: spark_warehouse.tar.gz
7979
when: on_fail
8080

81+
# run these separately as we need a isolated JVM to not have Spark session settings interfere with other runs
82+
"Scala 13 -- Delta Lake Format Tests":
83+
executor: docker_baseimg_executor
84+
steps:
85+
- checkout
86+
- run:
87+
name: Run Scala 13 tests for Delta Lake format
88+
environment:
89+
format_test: deltalake
90+
shell: /bin/bash -leuxo pipefail
91+
command: |
92+
conda activate chronon_py
93+
# Increase if we see OOM.
94+
export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G"
95+
sbt '++ 2.13.6' "testOnly ai.chronon.spark.test.TableUtilsFormatTest"
96+
- store_test_results:
97+
path: /chronon/spark/target/test-reports
98+
- store_test_results:
99+
path: /chronon/aggregator/target/test-reports
100+
- run:
101+
name: Compress spark-warehouse
102+
command: |
103+
cd /tmp/ && tar -czvf spark-warehouse.tar.gz chronon/spark-warehouse
104+
when: on_fail
105+
- store_artifacts:
106+
path: /tmp/spark-warehouse.tar.gz
107+
destination: spark_warehouse.tar.gz
108+
when: on_fail
109+
81110
"Scala 11 -- Compile":
82111
executor: docker_baseimg_executor
83112
steps:

spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ class ChrononKryoRegistrator extends KryoRegistrator {
147147
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8",
148148
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5",
149149
"scala.collection.immutable.ArraySeq$ofRef",
150-
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow"
151-
// "org.apache.spark.sql.delta.stats.DeltaFileStatistics",
152-
// "org.apache.spark.sql.delta.actions.AddFile"
150+
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow",
151+
"org.apache.spark.sql.delta.stats.DeltaFileStatistics",
152+
"org.apache.spark.sql.delta.actions.AddFile"
153153
)
154154
names.foreach { name =>
155155
try {

spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import org.apache.spark.SPARK_VERSION
2222

2323
import java.io.File
2424
import java.util.logging.Logger
25-
import scala.reflect.io.Path
2625
import scala.util.Properties
2726

2827
object SparkSessionBuilder {
2928
@transient private lazy val logger = LoggerFactory.getLogger(getClass)
3029

3130
private val warehouseId = java.util.UUID.randomUUID().toString.takeRight(6)
3231
private val DefaultWarehouseDir = new File("/tmp/chronon/spark-warehouse_" + warehouseId)
32+
val FormatTestEnvVar: String = "format_test"
3333

3434
def expandUser(path: String): String = path.replaceFirst("~", System.getProperty("user.home"))
3535
// we would want to share locally generated warehouse during CI testing
@@ -38,6 +38,23 @@ object SparkSessionBuilder {
3838
localWarehouseLocation: Option[String] = None,
3939
additionalConfig: Option[Map[String, String]] = None,
4040
enforceKryoSerializer: Boolean = true): SparkSession = {
41+
42+
// allow us to override the format by specifying env vars. This allows us to not have to worry about interference
43+
// between Spark sessions created in existing chronon tests that need the hive format and some specific tests
44+
// that require a format override like delta lake.
45+
val formatConfigs = sys.env.get(FormatTestEnvVar) match {
46+
case Some("deltalake") =>
47+
Map(
48+
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
49+
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
50+
"spark.chronon.table_write.format" -> "delta"
51+
)
52+
case _ => Map.empty
53+
}
54+
55+
// tack on format configs with additional configs
56+
val mergedConfigs = additionalConfig.getOrElse(Map.empty) ++ formatConfigs
57+
4158
if (local) {
4259
//required to run spark locally with hive support enabled - for sbt test
4360
System.setSecurityManager(null)
@@ -65,9 +82,8 @@ object SparkSessionBuilder {
6582
.config("spark.kryoserializer.buffer.max", "2000m")
6683
.config("spark.kryo.referenceTracking", "false")
6784
}
68-
additionalConfig.foreach { configMap =>
69-
configMap.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
70-
}
85+
86+
mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
7187

7288
if (SPARK_VERSION.startsWith("2")) {
7389
// Otherwise files left from deleting the table with the same name result in test failures

spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala

Lines changed: 107 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -1,169 +1,144 @@
11
package ai.chronon.spark.test
22

33
import ai.chronon.api.{DoubleType, IntType, LongType, StringType, StructField, StructType}
4+
import ai.chronon.spark.SparkSessionBuilder.FormatTestEnvVar
45
import ai.chronon.spark.test.TestUtils.makeDf
5-
import ai.chronon.spark.{DeltaLake, Format, Hive, IncompatibleSchemaException, SparkSessionBuilder, TableUtils}
6-
import org.apache.spark.SparkContext
6+
import ai.chronon.spark.{IncompatibleSchemaException, SparkSessionBuilder, TableUtils}
77
import org.apache.spark.sql.functions.col
88
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
99
import org.junit.Assert.{assertEquals, assertTrue}
10-
import org.scalatest.BeforeAndAfterEach
1110
import org.scalatest.funsuite.AnyFunSuite
12-
import org.scalatest.prop.TableDrivenPropertyChecks._
1311

1412
import scala.util.Try
1513

16-
class TestTableUtils(sparkSession: SparkSession, format: Format) extends TableUtils(sparkSession) {
17-
override def getWriteFormat: Format = format
18-
}
19-
20-
class TableUtilsFormatTest extends AnyFunSuite with BeforeAndAfterEach {
14+
class TableUtilsFormatTest extends AnyFunSuite {
2115

2216
import TableUtilsFormatTest._
2317

24-
val deltaConfigMap = Map(
25-
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
26-
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
27-
)
28-
val hiveConfigMap = Map.empty[String, String]
29-
30-
// TODO: include Hive + Iceberg support in these tests
31-
val formats =
32-
Table(
33-
("format", "configs"),
34-
(DeltaLake, deltaConfigMap),
35-
(Hive, hiveConfigMap)
36-
)
37-
38-
private def withSparkSession[T](configs: Map[String, String])(test: SparkSession => T): T = {
39-
val spark = SparkSessionBuilder.build("TableUtilsFormatTest", local = true, additionalConfig = Some(configs))
40-
val sc = SparkContext.getOrCreate()
18+
// Read the format we want this instantiation of the test to run via environment vars
19+
val format: String = sys.env.getOrElse(FormatTestEnvVar, "hive")
20+
21+
private def withSparkSession[T](test: SparkSession => T): T = {
22+
val spark = SparkSessionBuilder.build("TableUtilsFormatTest", local = true)
4123
try {
4224
test(spark)
4325
} finally {
44-
configs.keys.foreach(cfg => sc.getConf.remove(cfg))
4526
spark.stop()
4627
}
4728
}
4829

4930
ignore("test insertion of partitioned data and adding of columns") {
50-
forAll(formats) { (format, configs) =>
51-
withSparkSession(configs) { spark =>
52-
val tableUtils = new TestTableUtils(spark, format)
53-
54-
val tableName = s"db.test_table_1_$format"
55-
spark.sql("CREATE DATABASE IF NOT EXISTS db")
56-
val columns1 = Array(
57-
StructField("long_field", LongType),
58-
StructField("int_field", IntType),
59-
StructField("string_field", StringType)
31+
withSparkSession { spark =>
32+
val tableUtils = TableUtils(spark)
33+
34+
val tableName = s"db.test_table_1_$format"
35+
spark.sql("CREATE DATABASE IF NOT EXISTS db")
36+
val columns1 = Array(
37+
StructField("long_field", LongType),
38+
StructField("int_field", IntType),
39+
StructField("string_field", StringType)
40+
)
41+
val df1 = makeDf(
42+
spark,
43+
StructType(
44+
tableName,
45+
columns1 :+ StructField("ds", StringType)
46+
),
47+
List(
48+
Row(1L, 2, "3", "2022-10-01")
6049
)
61-
val df1 = makeDf(
62-
spark,
63-
StructType(
64-
tableName,
65-
columns1 :+ StructField("ds", StringType)
66-
),
67-
List(
68-
Row(1L, 2, "3", "2022-10-01")
69-
)
50+
)
51+
52+
val df2 = makeDf(
53+
spark,
54+
StructType(
55+
tableName,
56+
columns1
57+
:+ StructField("double_field", DoubleType)
58+
:+ StructField("ds", StringType)
59+
),
60+
List(
61+
Row(4L, 5, "6", 7.0, "2022-10-02")
7062
)
71-
72-
val df2 = makeDf(
73-
spark,
74-
StructType(
75-
tableName,
76-
columns1
77-
:+ StructField("double_field", DoubleType)
78-
:+ StructField("ds", StringType)
79-
),
80-
List(
81-
Row(4L, 5, "6", 7.0, "2022-10-02")
82-
)
83-
)
84-
testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02")
85-
}
63+
)
64+
testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02")
8665
}
8766
}
8867

8968
ignore("test insertion of partitioned data and removal of columns") {
90-
forAll(formats) { (format, configs) =>
91-
withSparkSession(configs) { spark =>
92-
val tableUtils = TableUtils(spark)
93-
val tableName = s"db.test_table_2_$format"
94-
spark.sql("CREATE DATABASE IF NOT EXISTS db")
95-
val columns1 = Array(
96-
StructField("long_field", LongType),
97-
StructField("int_field", IntType),
98-
StructField("string_field", StringType)
69+
withSparkSession { spark =>
70+
val tableUtils = TableUtils(spark)
71+
val tableName = s"db.test_table_2_$format"
72+
spark.sql("CREATE DATABASE IF NOT EXISTS db")
73+
val columns1 = Array(
74+
StructField("long_field", LongType),
75+
StructField("int_field", IntType),
76+
StructField("string_field", StringType)
77+
)
78+
val df1 = makeDf(
79+
spark,
80+
StructType(
81+
tableName,
82+
columns1
83+
:+ StructField("double_field", DoubleType)
84+
:+ StructField("ds", StringType)
85+
),
86+
List(
87+
Row(1L, 2, "3", 4.0, "2022-10-01")
9988
)
100-
val df1 = makeDf(
101-
spark,
102-
StructType(
103-
tableName,
104-
columns1
105-
:+ StructField("double_field", DoubleType)
106-
:+ StructField("ds", StringType)
107-
),
108-
List(
109-
Row(1L, 2, "3", 4.0, "2022-10-01")
110-
)
89+
)
90+
91+
val df2 = makeDf(
92+
spark,
93+
StructType(
94+
tableName,
95+
columns1 :+ StructField("ds", StringType)
96+
),
97+
List(
98+
Row(5L, 6, "7", "2022-10-02")
11199
)
112-
113-
val df2 = makeDf(
114-
spark,
115-
StructType(
116-
tableName,
117-
columns1 :+ StructField("ds", StringType)
118-
),
119-
List(
120-
Row(5L, 6, "7", "2022-10-02")
121-
)
122-
)
123-
testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02")
124-
}
100+
)
101+
testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02")
125102
}
126103
}
127104

128105
ignore("test insertion of partitioned data and modification of columns") {
129-
forAll(formats) { (format, configs) =>
130-
withSparkSession(configs) { spark =>
131-
val tableUtils = TableUtils(spark)
132-
133-
val tableName = s"db.test_table_3_$format"
134-
spark.sql("CREATE DATABASE IF NOT EXISTS db")
135-
val columns1 = Array(
136-
StructField("long_field", LongType),
137-
StructField("int_field", IntType)
106+
withSparkSession { spark =>
107+
val tableUtils = TableUtils(spark)
108+
109+
val tableName = s"db.test_table_3_$format"
110+
spark.sql("CREATE DATABASE IF NOT EXISTS db")
111+
val columns1 = Array(
112+
StructField("long_field", LongType),
113+
StructField("int_field", IntType)
114+
)
115+
val df1 = makeDf(
116+
spark,
117+
StructType(
118+
tableName,
119+
columns1
120+
:+ StructField("string_field", StringType)
121+
:+ StructField("ds", StringType)
122+
),
123+
List(
124+
Row(1L, 2, "3", "2022-10-01")
138125
)
139-
val df1 = makeDf(
140-
spark,
141-
StructType(
142-
tableName,
143-
columns1
144-
:+ StructField("string_field", StringType)
145-
:+ StructField("ds", StringType)
146-
),
147-
List(
148-
Row(1L, 2, "3", "2022-10-01")
149-
)
126+
)
127+
128+
val df2 = makeDf(
129+
spark,
130+
StructType(
131+
tableName,
132+
columns1
133+
:+ StructField("string_field", DoubleType) // modified column data type
134+
:+ StructField("ds", StringType)
135+
),
136+
List(
137+
Row(1L, 2, 3.0, "2022-10-02")
150138
)
139+
)
151140

152-
val df2 = makeDf(
153-
spark,
154-
StructType(
155-
tableName,
156-
columns1
157-
:+ StructField("string_field", DoubleType) // modified column data type
158-
:+ StructField("ds", StringType)
159-
),
160-
List(
161-
Row(1L, 2, 3.0, "2022-10-02")
162-
)
163-
)
164-
165-
testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02")
166-
}
141+
testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02")
167142
}
168143
}
169144
}
@@ -172,7 +147,7 @@ object TableUtilsFormatTest {
172147
private def testInsertPartitions(spark: SparkSession,
173148
tableUtils: TableUtils,
174149
tableName: String,
175-
format: Format,
150+
format: String,
176151
df1: DataFrame,
177152
df2: DataFrame,
178153
ds1: String,
@@ -204,7 +179,8 @@ object TableUtilsFormatTest {
204179
tableUtils.insertPartitions(df2, tableName, autoExpand = true)
205180

206181
// check that we wrote out a table in the right format
207-
assertTrue(tableUtils.tableFormat(tableName) == format)
182+
val readTableFormat = tableUtils.tableFormat(tableName).toString
183+
assertTrue(s"Mismatch in table format: $readTableFormat; expected: $format", readTableFormat.toLowerCase == format)
208184

209185
// check we have all the partitions written
210186
val returnedPartitions = tableUtils.partitions(tableName)

0 commit comments

Comments
 (0)