Skip to content

Commit 71c07dc

Browse files
kryo
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
1 parent c05d68f commit 71c07dc

File tree

2 files changed

+74
-68
lines changed

2 files changed

+74
-68
lines changed

spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala

Lines changed: 73 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -79,97 +79,104 @@ class ChrononKryoRegistrator extends KryoRegistrator {
7979
override def registerClasses(kryo: Kryo): Unit = {
8080
// kryo.setWarnUnregisteredClasses(true)
8181
val names = Seq(
82-
"java.time.LocalDateTime",
83-
"java.time.LocalDate",
84-
"org.apache.hadoop.fs.Path",
85-
"org.apache.hadoop.fs.FileStatus",
86-
"org.apache.hadoop.fs.LocatedFileStatus",
87-
"org.apache.hadoop.fs.BlockLocation",
88-
"org.apache.hadoop.fs.StorageType",
89-
"org.apache.hadoop.fs.permission.FsPermission",
90-
"org.apache.hadoop.fs.permission.FsAction",
91-
"org.apache.hadoop.fs.FileUtil$CopyMapper",
92-
"org.apache.hadoop.fs.FileUtil$CopyReducer",
93-
"org.apache.hadoop.fs.FileUtil$CopyFiles",
94-
"org.apache.hadoop.fs.FileUtil$CopyListingFileStatus",
95-
"org.apache.spark.sql.execution.joins.UnsafeHashedRelation",
96-
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage",
97-
"org.apache.spark.sql.execution.datasources.ExecutedWriteSummary",
98-
"org.apache.spark.sql.execution.datasources.BasicWriteTaskStats",
99-
"org.apache.spark.sql.execution.datasources.WriteTaskResult",
100-
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex",
101-
"org.apache.spark.sql.execution.joins.LongHashedRelation",
102-
"org.apache.spark.sql.execution.joins.LongToUnsafeRowMap",
103-
"org.apache.spark.sql.execution.streaming.sources.ForeachWriterCommitMessage$",
104-
"org.apache.spark.sql.types.Metadata",
105-
"ai.chronon.api.Row",
106-
"ai.chronon.spark.KeyWithHash",
82+
"ai.chronon.aggregator.base.ApproxHistogramIr",
10783
"ai.chronon.aggregator.base.MomentsIR",
10884
"ai.chronon.aggregator.windowing.BatchIr",
10985
"ai.chronon.aggregator.base.ApproxHistogramIr",
11086
"ai.chronon.online.RowWrapper",
11187
"ai.chronon.online.fetcher.Fetcher$Request",
11288
"ai.chronon.aggregator.windowing.FinalBatchIr",
89+
"ai.chronon.api.Row",
90+
"ai.chronon.online.Fetcher$Request",
11391
"ai.chronon.online.LoggableResponse",
11492
"ai.chronon.online.LoggableResponseBase64",
115-
"org.apache.datasketches.kll.KllFloatsSketch",
116-
"java.util.HashMap",
93+
"ai.chronon.online.RowWrapper",
94+
"ai.chronon.spark.KeyWithHash",
95+
"java.time.LocalDate",
96+
"java.time.LocalDateTime",
11797
"java.util.ArrayList",
118-
"java.util.HashSet",
11998
"java.util.Collections$EmptySet",
99+
"java.util.HashMap",
100+
"java.util.HashSet",
101+
"java.util.concurrent.ConcurrentHashMap",
102+
"java.util.concurrent.atomic.AtomicBoolean",
103+
"org.apache.datasketches.kll.KllFloatsSketch",
104+
"org.apache.datasketches.kll.KllHeapFloatsSketch",
105+
"org.apache.datasketches.kll.KllSketch$SketchStructure",
106+
"org.apache.datasketches.kll.KllSketch$SketchType",
107+
"org.apache.hadoop.fs.BlockLocation",
108+
"org.apache.hadoop.fs.FileStatus",
109+
"org.apache.hadoop.fs.FileUtil$CopyFiles",
110+
"org.apache.hadoop.fs.FileUtil$CopyListingFileStatus",
111+
"org.apache.hadoop.fs.FileUtil$CopyMapper",
112+
"org.apache.hadoop.fs.FileUtil$CopyReducer",
113+
"org.apache.hadoop.fs.LocatedFileStatus",
114+
"org.apache.hadoop.fs.Path",
115+
"org.apache.hadoop.fs.StorageType",
116+
"org.apache.hadoop.fs.permission.FsAction",
117+
"org.apache.hadoop.fs.permission.FsPermission",
118+
"org.apache.iceberg.io.ResolvingFileIO",
119+
"org.apache.iceberg.util.SerializableMap",
120+
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage",
120121
"org.apache.spark.sql.Row",
121122
"org.apache.spark.sql.catalyst.InternalRow",
123+
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5",
124+
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8",
125+
"org.apache.spark.sql.catalyst.expressions.Ascending$",
126+
"org.apache.spark.sql.catalyst.expressions.BoundReference",
127+
"org.apache.spark.sql.catalyst.expressions.Descending$",
128+
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow",
122129
"org.apache.spark.sql.catalyst.expressions.GenericRow",
123130
"org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema",
131+
"org.apache.spark.sql.catalyst.expressions.NullsFirst$",
132+
"org.apache.spark.sql.catalyst.expressions.NullsLast$",
133+
"org.apache.spark.sql.catalyst.expressions.SortOrder",
124134
"org.apache.spark.sql.catalyst.expressions.UnsafeRow",
125-
"org.apache.spark.sql.types.StructField",
126-
"org.apache.spark.sql.types.StructType",
127-
"org.apache.spark.sql.types.LongType$", // dollar stands for case objects
128-
"org.apache.spark.sql.types.StringType",
129-
"org.apache.spark.sql.types.StringType$",
130-
"org.apache.spark.sql.types.IntegerType$",
135+
"org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering",
136+
"org.apache.spark.sql.catalyst.trees.Origin",
137+
"org.apache.spark.sql.execution.datasources.BasicWriteTaskStats",
138+
"org.apache.spark.sql.execution.datasources.ExecutedWriteSummary",
139+
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex",
140+
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableBlockLocation",
141+
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableFileStatus",
142+
"org.apache.spark.sql.execution.datasources.WriteTaskResult",
143+
"org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult",
144+
"org.apache.spark.sql.execution.joins.EmptyHashedRelation",
145+
"org.apache.spark.sql.execution.joins.EmptyHashedRelation$",
146+
"org.apache.spark.sql.execution.joins.LongHashedRelation",
147+
"org.apache.spark.sql.execution.joins.LongToUnsafeRowMap",
148+
"org.apache.spark.sql.execution.joins.UnsafeHashedRelation",
149+
"org.apache.spark.sql.execution.streaming.sources.ForeachWriterCommitMessage$",
131150
"org.apache.spark.sql.types.BinaryType",
132-
"org.apache.spark.sql.types.DataType",
133-
"org.apache.spark.sql.types.NullType$",
134-
"org.apache.spark.sql.types.DoubleType$",
135-
"org.apache.spark.sql.types.BooleanType$",
136151
"org.apache.spark.sql.types.BinaryType$",
152+
"org.apache.spark.sql.types.BooleanType$",
153+
"org.apache.spark.sql.types.DataType",
137154
"org.apache.spark.sql.types.DateType$",
155+
"org.apache.spark.sql.types.DoubleType$",
156+
"org.apache.spark.sql.types.IntegerType$",
157+
"org.apache.spark.sql.types.LongType$",
158+
"org.apache.spark.sql.types.Metadata",
159+
"org.apache.spark.sql.types.NullType$",
160+
"org.apache.spark.sql.types.StringType",
161+
"org.apache.spark.sql.types.StringType$",
162+
"org.apache.spark.sql.types.StructField",
163+
"org.apache.spark.sql.types.StructType",
138164
"org.apache.spark.sql.types.TimestampType$",
165+
"org.apache.spark.unsafe.types.UTF8String",
166+
"org.apache.spark.util.HadoopFSUtils$SerializableBlockLocation",
167+
"org.apache.spark.util.HadoopFSUtils$SerializableFileStatus",
168+
"org.apache.spark.util.collection.CompactBuffer",
139169
"org.apache.spark.util.sketch.BitArray",
140170
"org.apache.spark.util.sketch.BloomFilterImpl",
141-
"org.apache.spark.util.collection.CompactBuffer",
142-
"scala.reflect.ClassTag$$anon$1",
143-
"scala.math.Ordering$$anon$4",
144-
"org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering",
145-
"org.apache.spark.sql.catalyst.expressions.SortOrder",
146-
"org.apache.spark.sql.catalyst.expressions.BoundReference",
147-
"org.apache.spark.sql.catalyst.trees.Origin",
148-
"org.apache.spark.sql.catalyst.expressions.Ascending$",
149-
"org.apache.spark.sql.catalyst.expressions.Descending$",
150-
"org.apache.spark.sql.catalyst.expressions.NullsFirst$",
151-
"org.apache.spark.sql.catalyst.expressions.NullsLast$",
152171
"scala.collection.IndexedSeqLike$Elements",
153-
"org.apache.spark.unsafe.types.UTF8String",
172+
"scala.collection.immutable.ArraySeq$ofRef",
173+
"scala.math.Ordering$$anon$4",
174+
"scala.reflect.ClassTag$$anon$1",
154175
"scala.reflect.ClassTag$GenericClassTag",
155-
"org.apache.spark.util.HadoopFSUtils$SerializableFileStatus",
156-
"org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult",
157-
"org.apache.spark.sql.execution.joins.EmptyHashedRelation",
158-
"org.apache.spark.util.HadoopFSUtils$SerializableBlockLocation",
159-
"scala.reflect.ManifestFactory$LongManifest",
160-
"org.apache.spark.sql.execution.joins.EmptyHashedRelation$",
161-
"scala.reflect.ManifestFactory$$anon$1",
162176
"scala.reflect.ClassTag$GenericClassTag",
163-
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableFileStatus",
164-
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableBlockLocation",
177+
"scala.reflect.ManifestFactory$$anon$1",
165178
"scala.reflect.ManifestFactory$$anon$10",
166-
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8",
167-
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5",
168-
"scala.collection.immutable.ArraySeq$ofRef",
169-
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow",
170-
"org.apache.datasketches.kll.KllHeapFloatsSketch",
171-
"org.apache.datasketches.kll.KllSketch$SketchStructure",
172-
"org.apache.datasketches.kll.KllSketch$SketchType"
179+
"scala.reflect.ManifestFactory$LongManifest"
173180
)
174181
names.foreach(name => doRegister(name, kryo))
175182

spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,6 @@ object SparkSessionBuilder {
141141
.config("spark.kryo.referenceTracking", "false")
142142
}
143143

144-
mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
145-
146144
if (SPARK_VERSION.startsWith("2")) {
147145
// Otherwise files left from deleting the table with the same name result in test failures
148146
baseBuilder.config("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "true")
@@ -165,6 +163,7 @@ object SparkSessionBuilder {
165163
// hive jars need to be available on classpath - no needed for local testing
166164
baseBuilder
167165
}
166+
mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
168167
val spark = builder.getOrCreate()
169168
// disable log spam
170169
spark.sparkContext.setLogLevel("ERROR")

0 commit comments

Comments
 (0)