Skip to content

Commit 8de6087

Browse files
kryo
Co-authored-by: Thomas Chow <[email protected]>
1 parent 5226ea2 commit 8de6087

File tree

2 files changed

+74
-71
lines changed

2 files changed

+74
-71
lines changed

spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala

Lines changed: 73 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -79,97 +79,101 @@ class ChrononKryoRegistrator extends KryoRegistrator {
7979
override def registerClasses(kryo: Kryo): Unit = {
8080
//kryo.setWarnUnregisteredClasses(true)
8181
val names = Seq(
82-
"java.time.LocalDateTime",
83-
"java.time.LocalDate",
84-
"org.apache.hadoop.fs.Path",
85-
"org.apache.hadoop.fs.FileStatus",
86-
"org.apache.hadoop.fs.LocatedFileStatus",
87-
"org.apache.hadoop.fs.BlockLocation",
88-
"org.apache.hadoop.fs.StorageType",
89-
"org.apache.hadoop.fs.permission.FsPermission",
90-
"org.apache.hadoop.fs.permission.FsAction",
91-
"org.apache.hadoop.fs.FileUtil$CopyMapper",
92-
"org.apache.hadoop.fs.FileUtil$CopyReducer",
93-
"org.apache.hadoop.fs.FileUtil$CopyFiles",
94-
"org.apache.hadoop.fs.FileUtil$CopyListingFileStatus",
95-
"org.apache.spark.sql.execution.joins.UnsafeHashedRelation",
96-
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage",
97-
"org.apache.spark.sql.execution.datasources.ExecutedWriteSummary",
98-
"org.apache.spark.sql.execution.datasources.BasicWriteTaskStats",
99-
"org.apache.spark.sql.execution.datasources.WriteTaskResult",
100-
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex",
101-
"org.apache.spark.sql.execution.joins.LongHashedRelation",
102-
"org.apache.spark.sql.execution.joins.LongToUnsafeRowMap",
103-
"org.apache.spark.sql.execution.streaming.sources.ForeachWriterCommitMessage$",
104-
"org.apache.spark.sql.types.Metadata",
105-
"ai.chronon.api.Row",
106-
"ai.chronon.spark.KeyWithHash",
82+
"ai.chronon.aggregator.base.ApproxHistogramIr",
10783
"ai.chronon.aggregator.base.MomentsIR",
10884
"ai.chronon.aggregator.windowing.BatchIr",
109-
"ai.chronon.aggregator.base.ApproxHistogramIr",
110-
"ai.chronon.online.RowWrapper",
111-
"ai.chronon.online.Fetcher$Request",
11285
"ai.chronon.aggregator.windowing.FinalBatchIr",
86+
"ai.chronon.api.Row",
87+
"ai.chronon.online.Fetcher$Request",
11388
"ai.chronon.online.LoggableResponse",
11489
"ai.chronon.online.LoggableResponseBase64",
115-
"org.apache.datasketches.kll.KllFloatsSketch",
116-
"java.util.HashMap",
90+
"ai.chronon.online.RowWrapper",
91+
"ai.chronon.spark.KeyWithHash",
92+
"java.time.LocalDate",
93+
"java.time.LocalDateTime",
11794
"java.util.ArrayList",
118-
"java.util.HashSet",
11995
"java.util.Collections$EmptySet",
96+
"java.util.HashMap",
97+
"java.util.HashSet",
98+
"java.util.concurrent.ConcurrentHashMap",
99+
"java.util.concurrent.atomic.AtomicBoolean",
100+
"org.apache.datasketches.kll.KllFloatsSketch",
101+
"org.apache.datasketches.kll.KllHeapFloatsSketch",
102+
"org.apache.datasketches.kll.KllSketch$SketchStructure",
103+
"org.apache.datasketches.kll.KllSketch$SketchType",
104+
"org.apache.hadoop.fs.BlockLocation",
105+
"org.apache.hadoop.fs.FileStatus",
106+
"org.apache.hadoop.fs.FileUtil$CopyFiles",
107+
"org.apache.hadoop.fs.FileUtil$CopyListingFileStatus",
108+
"org.apache.hadoop.fs.FileUtil$CopyMapper",
109+
"org.apache.hadoop.fs.FileUtil$CopyReducer",
110+
"org.apache.hadoop.fs.LocatedFileStatus",
111+
"org.apache.hadoop.fs.Path",
112+
"org.apache.hadoop.fs.StorageType",
113+
"org.apache.hadoop.fs.permission.FsAction",
114+
"org.apache.hadoop.fs.permission.FsPermission",
115+
"org.apache.iceberg.io.ResolvingFileIO",
116+
"org.apache.iceberg.util.SerializableMap",
117+
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage",
120118
"org.apache.spark.sql.Row",
121119
"org.apache.spark.sql.catalyst.InternalRow",
120+
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5",
121+
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8",
122+
"org.apache.spark.sql.catalyst.expressions.Ascending$",
123+
"org.apache.spark.sql.catalyst.expressions.BoundReference",
124+
"org.apache.spark.sql.catalyst.expressions.Descending$",
125+
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow",
122126
"org.apache.spark.sql.catalyst.expressions.GenericRow",
123127
"org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema",
128+
"org.apache.spark.sql.catalyst.expressions.NullsFirst$",
129+
"org.apache.spark.sql.catalyst.expressions.NullsLast$",
130+
"org.apache.spark.sql.catalyst.expressions.SortOrder",
124131
"org.apache.spark.sql.catalyst.expressions.UnsafeRow",
125-
"org.apache.spark.sql.types.StructField",
126-
"org.apache.spark.sql.types.StructType",
127-
"org.apache.spark.sql.types.LongType$", // dollar stands for case objects
128-
"org.apache.spark.sql.types.StringType",
129-
"org.apache.spark.sql.types.StringType$",
130-
"org.apache.spark.sql.types.IntegerType$",
132+
"org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering",
133+
"org.apache.spark.sql.catalyst.trees.Origin",
134+
"org.apache.spark.sql.execution.datasources.BasicWriteTaskStats",
135+
"org.apache.spark.sql.execution.datasources.ExecutedWriteSummary",
136+
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex",
137+
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableBlockLocation",
138+
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableFileStatus",
139+
"org.apache.spark.sql.execution.datasources.WriteTaskResult",
140+
"org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult",
141+
"org.apache.spark.sql.execution.joins.EmptyHashedRelation",
142+
"org.apache.spark.sql.execution.joins.EmptyHashedRelation$",
143+
"org.apache.spark.sql.execution.joins.LongHashedRelation",
144+
"org.apache.spark.sql.execution.joins.LongToUnsafeRowMap",
145+
"org.apache.spark.sql.execution.joins.UnsafeHashedRelation",
146+
"org.apache.spark.sql.execution.streaming.sources.ForeachWriterCommitMessage$",
131147
"org.apache.spark.sql.types.BinaryType",
132-
"org.apache.spark.sql.types.DataType",
133-
"org.apache.spark.sql.types.NullType$",
134-
"org.apache.spark.sql.types.DoubleType$",
135-
"org.apache.spark.sql.types.BooleanType$",
136148
"org.apache.spark.sql.types.BinaryType$",
149+
"org.apache.spark.sql.types.BooleanType$",
150+
"org.apache.spark.sql.types.DataType",
137151
"org.apache.spark.sql.types.DateType$",
152+
"org.apache.spark.sql.types.DoubleType$",
153+
"org.apache.spark.sql.types.IntegerType$",
154+
"org.apache.spark.sql.types.LongType$",
155+
"org.apache.spark.sql.types.Metadata",
156+
"org.apache.spark.sql.types.NullType$",
157+
"org.apache.spark.sql.types.StringType",
158+
"org.apache.spark.sql.types.StringType$",
159+
"org.apache.spark.sql.types.StructField",
160+
"org.apache.spark.sql.types.StructType",
138161
"org.apache.spark.sql.types.TimestampType$",
162+
"org.apache.spark.unsafe.types.UTF8String",
163+
"org.apache.spark.util.HadoopFSUtils$SerializableBlockLocation",
164+
"org.apache.spark.util.HadoopFSUtils$SerializableFileStatus",
165+
"org.apache.spark.util.collection.CompactBuffer",
139166
"org.apache.spark.util.sketch.BitArray",
140167
"org.apache.spark.util.sketch.BloomFilterImpl",
141-
"org.apache.spark.util.collection.CompactBuffer",
142-
"scala.reflect.ClassTag$$anon$1",
143-
"scala.math.Ordering$$anon$4",
144-
"org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering",
145-
"org.apache.spark.sql.catalyst.expressions.SortOrder",
146-
"org.apache.spark.sql.catalyst.expressions.BoundReference",
147-
"org.apache.spark.sql.catalyst.trees.Origin",
148-
"org.apache.spark.sql.catalyst.expressions.Ascending$",
149-
"org.apache.spark.sql.catalyst.expressions.Descending$",
150-
"org.apache.spark.sql.catalyst.expressions.NullsFirst$",
151-
"org.apache.spark.sql.catalyst.expressions.NullsLast$",
152168
"scala.collection.IndexedSeqLike$Elements",
153-
"org.apache.spark.unsafe.types.UTF8String",
169+
"scala.collection.immutable.ArraySeq$ofRef",
170+
"scala.math.Ordering$$anon$4",
171+
"scala.reflect.ClassTag$$anon$1",
154172
"scala.reflect.ClassTag$GenericClassTag",
155-
"org.apache.spark.util.HadoopFSUtils$SerializableFileStatus",
156-
"org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult",
157-
"org.apache.spark.sql.execution.joins.EmptyHashedRelation",
158-
"org.apache.spark.util.HadoopFSUtils$SerializableBlockLocation",
159-
"scala.reflect.ManifestFactory$LongManifest",
160-
"org.apache.spark.sql.execution.joins.EmptyHashedRelation$",
161-
"scala.reflect.ManifestFactory$$anon$1",
162173
"scala.reflect.ClassTag$GenericClassTag",
163-
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableFileStatus",
164-
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableBlockLocation",
174+
"scala.reflect.ManifestFactory$$anon$1",
165175
"scala.reflect.ManifestFactory$$anon$10",
166-
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8",
167-
"org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5",
168-
"scala.collection.immutable.ArraySeq$ofRef",
169-
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow",
170-
"org.apache.datasketches.kll.KllHeapFloatsSketch",
171-
"org.apache.datasketches.kll.KllSketch$SketchStructure",
172-
"org.apache.datasketches.kll.KllSketch$SketchType"
176+
"scala.reflect.ManifestFactory$LongManifest"
173177
)
174178
names.foreach(name => doRegister(name, kryo))
175179

spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,6 @@ object SparkSessionBuilder {
141141
.config("spark.kryo.referenceTracking", "false")
142142
}
143143

144-
mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
145-
146144
if (SPARK_VERSION.startsWith("2")) {
147145
// Otherwise files left from deleting the table with the same name result in test failures
148146
baseBuilder.config("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "true")
@@ -165,6 +163,7 @@ object SparkSessionBuilder {
165163
// hive jars need to be available on classpath - no needed for local testing
166164
baseBuilder
167165
}
166+
mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) }
168167
val spark = builder.getOrCreate()
169168
// disable log spam
170169
spark.sparkContext.setLogLevel("ERROR")

0 commit comments

Comments
 (0)