Skip to content

Commit 6ee5dce

Browse files
authored
Add delta lake integration (#51)
## Summary Port of our OSS delta lake PR - airbnb/chronon#869. Largely the same aside from delta lake versions. We don't need this immediately atm but we'll need this if we have other users come along that need delta lake (or we need to add support for formats like hudi) ## Checklist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for Delta Lake operations with new dependencies and configurations. - Introduced new traits and case objects for handling different table formats, enhancing data management capabilities. - Added a new job in the CI workflow for testing Delta Lake format functionality. - **Bug Fixes** - Improved error handling in class registration processes. - **Tests** - Implemented a suite of unit tests for the `TableUtils` class to validate partitioned data insertions with schema modifications. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 8cd16dd commit 6ee5dce

File tree

6 files changed

+541
-107
lines changed

6 files changed

+541
-107
lines changed

.github/workflows/test_scala_spark.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,27 @@ jobs:
5959
export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
6060
sbt "spark/testOnly -- -n jointest"
6161
62+
table_utils_delta_format_spark_tests:
63+
runs-on: ubuntu-latest
64+
container:
65+
image: ghcr.io/${{ github.repository }}-ci:latest
66+
credentials:
67+
username: ${{ github.actor }}
68+
password: ${{ secrets.GITHUB_TOKEN }}
69+
defaults:
70+
run:
71+
working-directory: ${{ github.workspace }}
72+
73+
steps:
74+
- uses: actions/checkout@v4
75+
76+
- name: Run table utils format test for Delta Lake
77+
run: |
78+
export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
79+
sbt "spark/testOnly ai.chronon.spark.test.TableUtilsFormatTest"
80+
env:
81+
format_test: deltalake
82+
6283
mutation_spark_tests:
6384
runs-on: ubuntu-latest
6485
container:

build.sbt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ lazy val flink_1_17 = "1.17.0"
2929
lazy val jackson_2_15 = "2.15.2"
3030
lazy val avro_1_11 = "1.11.2"
3131
lazy val circeVersion = "0.14.9"
32+
lazy val deltaVersion = "3.2.0"
3233

3334
// skip tests on assembly - uncomment if builds become slow
3435
// ThisBuild / assembly / test := {}
@@ -61,6 +62,10 @@ val spark_sql = Seq(
6162
).map(_ % spark_3_5)
6263
val spark_sql_provided = spark_sql.map(_ % "provided")
6364

65+
val delta = Seq(
66+
"io.delta" %% "delta-spark"
67+
).map(_ % deltaVersion)
68+
6469
val spark_all = Seq(
6570
"org.apache.spark" %% "spark-sql",
6671
"org.apache.spark" %% "spark-hive",
@@ -172,7 +177,8 @@ lazy val spark = project
172177
libraryDependencies ++= spark_all.map(_ % "test"),
173178
libraryDependencies += "jakarta.servlet" % "jakarta.servlet-api" % "4.0.3",
174179
libraryDependencies += "com.google.guava" % "guava" % "33.3.1-jre",
175-
libraryDependencies ++= log4j2
180+
libraryDependencies ++= log4j2,
181+
libraryDependencies ++= delta.map(_ % "provided")
176182
)
177183

178184
lazy val flink = project

spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,32 @@ class ChrononKryoRegistrator extends KryoRegistrator {
170170
"org.apache.datasketches.kll.KllSketch$SketchStructure",
171171
"org.apache.datasketches.kll.KllSketch$SketchType"
172172
)
173-
names.foreach { name =>
174-
try {
175-
kryo.register(Class.forName(name))
176-
kryo.register(Class.forName(s"[L$name;")) // represents array of a type to jvm
177-
} catch {
178-
case _: ClassNotFoundException => // do nothing
179-
}
180-
}
173+
names.foreach(name => doRegister(name, kryo))
181174

182175
kryo.register(classOf[Array[Array[Array[AnyRef]]]])
183176
kryo.register(classOf[Array[Array[AnyRef]]])
184177
kryo.register(classOf[CpcSketch], new CpcSketchKryoSerializer())
185178
kryo.register(classOf[Array[ItemSketchSerializable]])
186179
kryo.register(classOf[ItemsSketchIR[AnyRef]], new ItemsSketchKryoSerializer[AnyRef])
187180
}
181+
182+
def doRegister(name: String, kryo: Kryo): Unit = {
183+
try {
184+
kryo.register(Class.forName(name))
185+
kryo.register(Class.forName(s"[L$name;")) // represents array of a type to jvm
186+
} catch {
187+
case _: ClassNotFoundException => // do nothing
188+
}
189+
}
190+
}
191+
192+
class ChrononDeltaLakeKryoRegistrator extends ChrononKryoRegistrator {
193+
override def registerClasses(kryo: Kryo): Unit = {
194+
super.registerClasses(kryo)
195+
val additionalDeltaNames = Seq(
196+
"org.apache.spark.sql.delta.stats.DeltaFileStatistics",
197+
"org.apache.spark.sql.delta.actions.AddFile"
198+
)
199+
additionalDeltaNames.foreach(name => doRegister(name, kryo))
200+
}
188201
}

spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object SparkSessionBuilder {
2929

3030
private val warehouseId = java.util.UUID.randomUUID().toString.takeRight(6)
3131
private val DefaultWarehouseDir = new File("/tmp/chronon/spark-warehouse_" + warehouseId)
32+
val FormatTestEnvVar: String = "format_test"
3233

3334
def expandUser(path: String): String = path.replaceFirst("~", System.getProperty("user.home"))
3435
// we would want to share locally generated warehouse during CI testing
@@ -38,6 +39,25 @@ object SparkSessionBuilder {
3839
localWarehouseLocation: Option[String] = None,
3940
additionalConfig: Option[Map[String, String]] = None,
4041
enforceKryoSerializer: Boolean = true): SparkSession = {
42+
43+
// allow us to override the format by specifying env vars. This allows us to not have to worry about interference
44+
// between Spark sessions created in existing chronon tests that need the hive format and some specific tests
45+
// that require a format override like delta lake.
46+
val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match {
47+
case Some("deltalake") =>
48+
logger.info("Using the delta lake table format + kryo registrators")
49+
val configMap = Map(
50+
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
51+
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
52+
"spark.chronon.table_write.format" -> "delta"
53+
)
54+
(configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator")
55+
case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator")
56+
}
57+
58+
// tack on format configs with additional configs
59+
val mergedConfigs = additionalConfig.getOrElse(Map.empty) ++ formatConfigs
60+
4161
val userName = Properties.userName
4262
val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath)
4363
println(s"Using warehouse dir: $warehouseDir")
@@ -62,13 +82,12 @@ object SparkSessionBuilder {
6282
if (enforceKryoSerializer) {
6383
baseBuilder
6484
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
65-
.config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator")
85+
.config("spark.kryo.registrator", kryoRegistrator)
6686
.config("spark.kryoserializer.buffer.max", "2000m")
6787
.config("spark.kryo.referenceTracking", "false")
6888
}
69-
additionalConfig.foreach { configMap =>
70-
configMap.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
71-
}
89+
90+
mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
7291

7392
if (SPARK_VERSION.startsWith("2")) {
7493
// Otherwise files left from deleting the table with the same name result in test failures

0 commit comments

Comments
 (0)