Skip to content

Commit df4f9cc

Browse files
committed
Shuffle around kryo registrator
1 parent 6c17647 commit df4f9cc

File tree

2 files changed

+27
-15
lines changed

2 files changed

+27
-15
lines changed

spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,23 +147,34 @@ class ChrononKryoRegistrator extends KryoRegistrator {
147147
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8",
148148
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5",
149149
"scala.collection.immutable.ArraySeq$ofRef",
150-
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow",
151-
"org.apache.spark.sql.delta.stats.DeltaFileStatistics",
152-
"org.apache.spark.sql.delta.actions.AddFile"
150+
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow"
153151
)
154-
names.foreach { name =>
155-
try {
156-
kryo.register(Class.forName(name))
157-
kryo.register(Class.forName(s"[L$name;")) // represents array of a type to jvm
158-
} catch {
159-
case _: ClassNotFoundException => // do nothing
160-
}
161-
}
152+
names.foreach(name => doRegister(name, kryo))
162153

163154
kryo.register(classOf[Array[Array[Array[AnyRef]]]])
164155
kryo.register(classOf[Array[Array[AnyRef]]])
165156
kryo.register(classOf[CpcSketch], new CpcSketchKryoSerializer())
166157
kryo.register(classOf[Array[ItemSketchSerializable]])
167158
kryo.register(classOf[ItemsSketchIR[AnyRef]], new ItemsSketchKryoSerializer[AnyRef])
168159
}
160+
161+
def doRegister(name: String, kryo: Kryo): Unit = {
162+
try {
163+
kryo.register(Class.forName(name))
164+
kryo.register(Class.forName(s"[L$name;")) // represents array of a type to jvm
165+
} catch {
166+
case _: ClassNotFoundException => // do nothing
167+
}
168+
}
169+
}
170+
171+
class ChrononDeltaLakeKryoRegistrator extends ChrononKryoRegistrator {
172+
override def registerClasses(kryo: Kryo): Unit = {
173+
super.registerClasses(kryo)
174+
val additionalDeltaNames = Seq(
175+
"org.apache.spark.sql.delta.stats.DeltaFileStatistics",
176+
"org.apache.spark.sql.delta.actions.AddFile"
177+
)
178+
additionalDeltaNames.foreach(name => doRegister(name, kryo))
179+
}
169180
}

spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,15 @@ object SparkSessionBuilder {
4242
// allow us to override the format by specifying env vars. This allows us to not have to worry about interference
4343
// between Spark sessions created in existing chronon tests that need the hive format and some specific tests
4444
// that require a format override like delta lake.
45-
val formatConfigs = sys.env.get(FormatTestEnvVar) match {
45+
val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match {
4646
case Some("deltalake") =>
47-
Map(
47+
val configMap = Map(
4848
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
4949
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
5050
"spark.chronon.table_write.format" -> "delta"
5151
)
52-
case _ => Map.empty
52+
(configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator")
53+
case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator")
5354
}
5455

5556
// tack on format configs with additional configs
@@ -78,7 +79,7 @@ object SparkSessionBuilder {
7879
if (enforceKryoSerializer) {
7980
baseBuilder
8081
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
81-
.config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator")
82+
.config("spark.kryo.registrator", kryoRegistrator)
8283
.config("spark.kryoserializer.buffer.max", "2000m")
8384
.config("spark.kryo.referenceTracking", "false")
8485
}

0 commit comments

Comments
 (0)