Skip to content

Add support for Delta lake tables in TableUtils #869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c3aae9b
First stab at adding support for Delta lake tables in TableUtils
piyush-zlai Oct 20, 2024
1a64baa
Scala fmt
piyush-zlai Oct 21, 2024
5e134ed
Update versions and code to work across older delta versions
piyush-zlai Oct 23, 2024
cd801ca
Add working delta tests
piyush-zlai Oct 23, 2024
51ae067
Add logging
piyush-zlai Oct 23, 2024
6a326a5
Unset confs
piyush-zlai Oct 23, 2024
c72ef70
Scala fmt
piyush-zlai Oct 23, 2024
02dda37
Tweak logs
piyush-zlai Oct 23, 2024
fe4cc83
Try unsetting spark context
piyush-zlai Oct 23, 2024
d22525d
Swap things around
piyush-zlai Oct 23, 2024
81dd918
Tweak how we infer write format
piyush-zlai Oct 23, 2024
2ff3ed5
Play with test configs
piyush-zlai Oct 23, 2024
f3a26ba
Revert circle ci sbt and ignore tests for now
piyush-zlai Oct 24, 2024
b7b084a
Try commenting kryo code to see if that makes tests happy
piyush-zlai Oct 24, 2024
ad14e67
Trigger Build
piyush-zlai Oct 24, 2024
f15cc5b
Switch to env var driven approach
piyush-zlai Oct 27, 2024
7a9396e
Shuffle around kryo registrator
piyush-zlai Oct 27, 2024
f93d83e
Add missing circle ci workflows
piyush-zlai Oct 28, 2024
48a7604
Drop the sealed and enable tests
piyush-zlai Oct 31, 2024
837c943
Trigger Build
piyush-zlai Oct 31, 2024
2dd5a33
Switch table utils format test to junit tests
piyush-zlai Oct 31, 2024
e24a9d6
Enable one test
piyush-zlai Oct 31, 2024
1d36d63
Enable more tests
piyush-zlai Oct 31, 2024
5c8ed17
support format provider
mickjermsurawong-openai Oct 31, 2024
73577af
Scala fmt
piyush-zlai Nov 7, 2024
46748b6
Add default format provider
piyush-zlai Nov 7, 2024
b3cb5a5
Add read and write format providers
piyush-zlai Nov 7, 2024
aa76d05
Unify table format trait
piyush-zlai Nov 7, 2024
3066719
Combine format provider config setting
piyush-zlai Nov 7, 2024
2b381c2
Add serializable
piyush-zlai Nov 7, 2024
6affb63
Address comments
piyush-zlai Nov 8, 2024
e3d6632
Refactor on comments
piyush-zlai Nov 12, 2024
48cccf1
scala fmt fixes
piyush-zlai Nov 12, 2024
5673ac4
Change db name
piyush-zlai Nov 14, 2024
1af4545
Trigger Build
piyush-zlai Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,36 @@ jobs:
destination: spark_warehouse.tar.gz
when: on_fail

# run these separately as we need a isolated JVM to not have Spark session settings interfere with other runs
# long term goal is to refactor the current testing spark session builder and avoid adding new single test to CI
"Scala 13 -- Delta Lake Format Tests":
executor: docker_baseimg_executor
steps:
- checkout
- run:
name: Run Scala 13 tests for Delta Lake format
environment:
format_test: deltalake
shell: /bin/bash -leuxo pipefail
command: |
conda activate chronon_py
# Increase if we see OOM.
export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G"
sbt '++ 2.13.6' "testOnly ai.chronon.spark.test.TableUtilsFormatTest"
- store_test_results:
path: /chronon/spark/target/test-reports
- store_test_results:
path: /chronon/aggregator/target/test-reports
- run:
name: Compress spark-warehouse
command: |
cd /tmp/ && tar -czvf spark-warehouse.tar.gz chronon/spark-warehouse
when: on_fail
- store_artifacts:
path: /tmp/spark-warehouse.tar.gz
destination: spark_warehouse.tar.gz
when: on_fail

"Scala 11 -- Compile":
executor: docker_baseimg_executor
steps:
Expand Down Expand Up @@ -147,6 +177,9 @@ workflows:
- "Scala 13 -- Tests":
requires:
- "Pull Docker Image"
- "Scala 13 -- Delta Lake Format Tests":
requires:
- "Pull Docker Image"
- "Scalafmt Check":
requires:
- "Pull Docker Image"
Expand Down
12 changes: 10 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ val VersionMatrix: Map[String, VersionDependency] = Map(
None,
Some("1.0.4")
),
"delta-core" -> VersionDependency(
Seq(
"io.delta" %% "delta-core"
),
Some("0.6.1"),
Some("1.0.1"),
Some("2.0.2")
),
"jackson" -> VersionDependency(
Seq(
"com.fasterxml.jackson.core" % "jackson-core",
Expand Down Expand Up @@ -365,7 +373,7 @@ lazy val spark_uber = (project in file("spark"))
sparkBaseSettings,
version := git.versionProperty.value,
crossScalaVersions := supportedVersions,
libraryDependencies ++= fromMatrix(scalaVersion.value, "jackson", "spark-all/provided")
libraryDependencies ++= fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided")
)

lazy val spark_embedded = (project in file("spark"))
Expand All @@ -374,7 +382,7 @@ lazy val spark_embedded = (project in file("spark"))
sparkBaseSettings,
version := git.versionProperty.value,
crossScalaVersions := supportedVersions,
libraryDependencies ++= fromMatrix(scalaVersion.value, "spark-all"),
libraryDependencies ++= fromMatrix(scalaVersion.value, "spark-all", "delta-core"),
target := target.value.toPath.resolveSibling("target-embedded").toFile,
Test / test := {}
)
Expand Down
29 changes: 21 additions & 8 deletions spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,32 @@ class ChrononKryoRegistrator extends KryoRegistrator {
"scala.collection.immutable.ArraySeq$ofRef",
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow"
)
names.foreach { name =>
try {
kryo.register(Class.forName(name))
kryo.register(Class.forName(s"[L$name;")) // represents array of a type to jvm
} catch {
case _: ClassNotFoundException => // do nothing
}
}
names.foreach(name => doRegister(name, kryo))

kryo.register(classOf[Array[Array[Array[AnyRef]]]])
kryo.register(classOf[Array[Array[AnyRef]]])
kryo.register(classOf[CpcSketch], new CpcSketchKryoSerializer())
kryo.register(classOf[Array[ItemSketchSerializable]])
kryo.register(classOf[ItemsSketchIR[AnyRef]], new ItemsSketchKryoSerializer[AnyRef])
}

def doRegister(name: String, kryo: Kryo): Unit = {
try {
kryo.register(Class.forName(name))
kryo.register(Class.forName(s"[L$name;")) // represents array of a type to jvm
} catch {
case _: ClassNotFoundException => // do nothing
}
}
}

class ChrononDeltaLakeKryoRegistrator extends ChrononKryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
super.registerClasses(kryo)
val additionalDeltaNames = Seq(
"org.apache.spark.sql.delta.stats.DeltaFileStatistics",
"org.apache.spark.sql.delta.actions.AddFile"
)
additionalDeltaNames.foreach(name => doRegister(name, kryo))
}
}
26 changes: 23 additions & 3 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ object Driver {
default = Some(false),
descr = "Skip the first unfilled partition range if some future partitions have been populated.")

val useDeltaCatalog: ScallopOption[Boolean] =
opt[Boolean](required = false, default = Some(false), descr = "Enable the use of the delta lake catalog")

val stepDays: ScallopOption[Int] =
opt[Int](required = false,
descr = "Runs offline backfill in steps, step-days at a time. Default is 30 days",
Expand Down Expand Up @@ -136,8 +139,22 @@ object Driver {
def isLocal: Boolean = localTableMapping.nonEmpty || localDataPath.isDefined

protected def buildSparkSession(): SparkSession = {
// use of the delta lake catalog requires a couple of additional spark config options
val extraDeltaConfigs = useDeltaCatalog.toOption match {
case Some(true) =>
Some(
Map(
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog"
))
case _ => None
}

if (localTableMapping.nonEmpty) {
val localSession = SparkSessionBuilder.build(subcommandName(), local = true, localWarehouseLocation.toOption)
val localSession = SparkSessionBuilder.build(subcommandName(),
local = true,
localWarehouseLocation.toOption,
additionalConfig = extraDeltaConfigs)
localTableMapping.foreach {
case (table, filePath) =>
val file = new File(filePath)
Expand All @@ -150,13 +167,16 @@ object Driver {
val localSession =
SparkSessionBuilder.build(subcommandName(),
local = true,
localWarehouseLocation = localWarehouseLocation.toOption)
localWarehouseLocation = localWarehouseLocation.toOption,
additionalConfig = extraDeltaConfigs)
LocalDataLoader.loadDataRecursively(dir, localSession)
localSession
} else {
// We use the KryoSerializer for group bys and joins since we serialize the IRs.
// But since staging query is fairly freeform, it's better to stick to the java serializer.
SparkSessionBuilder.build(subcommandName(), enforceKryoSerializer = !subcommandName().contains("staging_query"))
SparkSessionBuilder.build(subcommandName(),
enforceKryoSerializer = !subcommandName().contains("staging_query"),
additionalConfig = extraDeltaConfigs)
}
}

Expand Down
27 changes: 22 additions & 5 deletions spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import org.apache.spark.SPARK_VERSION

import java.io.File
import java.util.logging.Logger
import scala.reflect.io.Path
import scala.util.Properties

object SparkSessionBuilder {
@transient private lazy val logger = LoggerFactory.getLogger(getClass)

private val warehouseId = java.util.UUID.randomUUID().toString.takeRight(6)
private val DefaultWarehouseDir = new File("/tmp/chronon/spark-warehouse_" + warehouseId)
val FormatTestEnvVar: String = "format_test"

def expandUser(path: String): String = path.replaceFirst("~", System.getProperty("user.home"))
// we would want to share locally generated warehouse during CI testing
Expand All @@ -38,6 +38,24 @@ object SparkSessionBuilder {
localWarehouseLocation: Option[String] = None,
additionalConfig: Option[Map[String, String]] = None,
enforceKryoSerializer: Boolean = true): SparkSession = {

// allow us to override the format by specifying env vars. This allows us to not have to worry about interference
// between Spark sessions created in existing chronon tests that need the hive format and some specific tests
// that require a format override like delta lake.
val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is only used for local testing. Could we move it within the if (local) block?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could - I actually thought the code is cleaner with the match up front - this allows us to skip the use of vars and a lot of if/else type of checks in a couple of places.
Let me know if you feel strongly and I can add there.

case Some("deltalake") =>
val configMap = Map(
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.chronon.table_write.format" -> "delta"
)
(configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator")
case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator")
}

// tack on format configs with additional configs
val mergedConfigs = additionalConfig.getOrElse(Map.empty) ++ formatConfigs

if (local) {
//required to run spark locally with hive support enabled - for sbt test
System.setSecurityManager(null)
Expand All @@ -61,13 +79,12 @@ object SparkSessionBuilder {
if (enforceKryoSerializer) {
baseBuilder
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator")
.config("spark.kryo.registrator", kryoRegistrator)
.config("spark.kryoserializer.buffer.max", "2000m")
.config("spark.kryo.referenceTracking", "false")
}
additionalConfig.foreach { configMap =>
configMap.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
}

mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }

if (SPARK_VERSION.startsWith("2")) {
// Otherwise files left from deleting the table with the same name result in test failures
Expand Down
Loading