Skip to content

Iceberg unit tests, support Iceberg + nonhive catalogs, Iceberg Kryo Serializer #993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
00d4da8
staging changes for testing iceberg
May 13, 2025
93b7c92
timeboxing test changes
May 13, 2025
2d4ed6f
bootstrapping spark test, 1/3 working on FormatTest
May 14, 2025
878f64a
got most tests working except droppartitions and the new dateint test
May 15, 2025
4739e93
cleaning some local changes
May 15, 2025
65c81a1
reverting some local changes
May 15, 2025
cab2c6a
formatting
May 15, 2025
f4bfb70
more silly local hacks
May 15, 2025
ac505d3
Merge branch 'main' into iceberg_unit_tests
abbywh May 15, 2025
ae2088f
fixed the constant derby flake
May 16, 2025
dee01ab
refactored to match deltalake
May 16, 2025
69cd50d
added Iceberg Kryo Serializer
May 16, 2025
5526abe
scalafmt
May 16, 2025
2b9c246
iceberg circleci integration
May 16, 2025
b423f4b
fixing typo
May 16, 2025
9e5eaae
giving circleci a dependency
May 16, 2025
558adc3
removing env file
May 16, 2025
c1eb8a2
moving integration test to spark_embedded
May 16, 2025
510266c
figured out why delta lake was on 2.13, need it for spark 3.2
May 16, 2025
58a58ff
typo
May 16, 2025
6ea4121
scalafmt
May 16, 2025
cb012a6
skipping the flink parts since it doesn't compile to 2.13.6
May 16, 2025
117c4f7
including TableUtilsTest as well in CI
May 16, 2025
7d3272f
sperating table utils and format for seperate jvms
May 16, 2025
f009f97
typo
May 16, 2025
02c1463
corrected behavior for long partitions
May 16, 2025
b05ef1a
Merge branch 'main' into iceberg_unit_tests
abbywh May 16, 2025
059e16e
eventeventlongds test, more kryo registration
May 17, 2025
907d2f8
iceberg drop partitions
May 17, 2025
0065406
long partition testing
May 17, 2025
a956c7f
Merge branch 'main' into iceberg_unit_tests
abbywh May 17, 2025
e980fa1
unskipping fixed tests
May 19, 2025
4054892
changing test schema
May 19, 2025
2ffd32d
updating drop partitions to be schemaless
May 19, 2025
50445f0
found bug during CI testing
May 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,62 @@ jobs:
command: |
conda activate chronon_py
sbt +scalafmtCheck
# run these separately as we need a isolated JVM to not have Spark session settings interfere with other runs
# long term goal is to refactor the current testing spark session builder and avoid adding new single test to CI
"Scala 13 -- Iceberg Format Tests":
executor: docker_baseimg_executor
steps:
- checkout
- run:
name: Run Scala 13 tests for Iceberg format
environment:
format_test: iceberg
shell: /bin/bash -leuxo pipefail
command: |
conda activate chronon_py
# Increase if we see OOM.
export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G"
sbt ';project spark_embedded; ++ 2.13.6; testOnly ai.chronon.spark.test.TableUtilsFormatTest'
- store_test_results:
path: /chronon/spark/target/test-reports
- store_test_results:
path: /chronon/aggregator/target/test-reports
- run:
name: Compress spark-warehouse
command: |
cd /tmp/ && tar -czvf spark-warehouse.tar.gz chronon/spark-warehouse
when: on_fail
- store_artifacts:
path: /tmp/spark-warehouse.tar.gz
destination: spark_warehouse.tar.gz
when: on_fail
"Scala 13 -- Iceberg Table Utils Tests":
executor: docker_baseimg_executor
steps:
- checkout
- run:
name: Run Scala 13 tests for Iceberg Table Utils
environment:
format_test: iceberg
shell: /bin/bash -leuxo pipefail
command: |
conda activate chronon_py
# Increase if we see OOM.
export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G"
sbt ';project spark_embedded; ++ 2.13.6; testOnly ai.chronon.spark.test.TableUtilsTest'
- store_test_results:
path: /chronon/spark/target/test-reports
- store_test_results:
path: /chronon/aggregator/target/test-reports
- run:
name: Compress spark-warehouse
command: |
cd /tmp/ && tar -czvf spark-warehouse.tar.gz chronon/spark-warehouse
when: on_fail
- store_artifacts:
path: /tmp/spark-warehouse.tar.gz
destination: spark_warehouse.tar.gz
when: on_fail

workflows:
build_test_deploy:
Expand All @@ -161,3 +217,9 @@ workflows:
- "Chronon Python Lint":
requires:
- "Pull Docker Image"
- "Scala 13 -- Iceberg Format Tests":
requires:
- "Pull Docker Image"
- "Scala 13 -- Iceberg Table Utils Tests":
requires:
- "Pull Docker Image"
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
*.logs
*.iml
.idea/
*.jvmopts
.bloop*
.metals*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is working with metals relative to intellij? does the debugger work as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really good actually. The debugged worked out of the box, I found it comparable to IntelliJ overall.

I'd recommend it to anyone who has remote dev boxes since VSCode's integration is far better in my experience. All the tests run a lot faster and I got in way more dev cycles. I probably would only recommend over IntelliJ with a dev box though.

.venv*
*metals.sbt*
.eclipse
**/.vscode/
**/__pycache__/
Expand All @@ -21,7 +26,9 @@ api/py/.coverage
api/py/htmlcov/
**/derby.log
cs

*.bloop
*.metals
*.venv
# Documentation builds
docs/build/

Expand Down
15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ lazy val spark3_1_1 = "3.1.1"
lazy val spark3_2_1 = "3.2.1"
lazy val spark3_5_3 = "3.5.3"
lazy val tmp_warehouse = "/tmp/chronon/"
lazy val icebergVersion = "1.1.0"

ThisBuild / organization := "ai.chronon"
ThisBuild / organizationName := "chronon"
Expand Down Expand Up @@ -189,6 +190,16 @@ val VersionMatrix: Map[String, VersionDependency] = Map(
Some("1.0.1"),
Some("2.0.2")
),
//3.2 is the minimum version for iceberg
// due to INSERT_INTO support without specifying iceberg format
"iceberg32" -> VersionDependency(
Seq(
"org.apache.iceberg" %% "iceberg-spark-runtime-3.2",
),
None,
None,
Some(icebergVersion),
),
"jackson" -> VersionDependency(
Seq(
"com.fasterxml.jackson.core" % "jackson-core",
Expand Down Expand Up @@ -415,7 +426,7 @@ lazy val spark_uber = (project in file("spark"))
libraryDependencies ++= (if (use_spark_3_5.value)
fromMatrix(scalaVersion.value, "jackson", "spark-all-3-5/provided", "delta-core/provided")
else
fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided"))
fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided", "iceberg32/provided")),
)

lazy val spark_embedded = (project in file("spark"))
Expand All @@ -427,7 +438,7 @@ lazy val spark_embedded = (project in file("spark"))
libraryDependencies ++= (if (use_spark_3_5.value)
fromMatrix(scalaVersion.value, "spark-all-3-5", "delta-core")
else
fromMatrix(scalaVersion.value, "spark-all", "delta-core")),
fromMatrix(scalaVersion.value, "spark-all", "delta-core", "iceberg32")),
target := target.value.toPath.resolveSibling("target-embedded").toFile,
Test / test := {}
)
Expand Down
41 changes: 41 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,44 @@ class ChrononDeltaLakeKryoRegistrator extends ChrononKryoRegistrator {
additionalDeltaNames.foreach(name => doRegister(name, kryo))
}
}

class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
super.registerClasses(kryo)
val additionalIcebergNames = Seq(
"org.apache.iceberg.spark.source.SerializableTableWithSize",
"org.apache.iceberg.encryption.PlaintextEncryptionManager",
"org.apache.iceberg.hadoop.HadoopFileIO",
"org.apache.iceberg.SerializableTable$SerializableConfSupplier",
"org.apache.iceberg.util.SerializableMap",
"org.apache.iceberg.LocationProviders$DefaultLocationProvider",
"org.apache.iceberg.spark.source.SparkWrite$TaskCommit",
"org.apache.iceberg.DataFile",
"org.apache.iceberg.GenericDataFile",
"org.apache.iceberg.FileContent",
"org.apache.iceberg.FileFormat",
"org.apache.iceberg.SerializableByteBufferMap",
"org.apache.iceberg.PartitionData",
// For some reasons just .Types doesn't work
"org.apache.iceberg.types.Types$StructType",
"org.apache.iceberg.types.Types$NestedField",
"org.apache.iceberg.types.Types$StringType",
"org.apache.iceberg.types.Types$IntegerType",
"org.apache.iceberg.types.Types$LongType",
"org.apache.iceberg.types.Types$DoubleType",
"org.apache.iceberg.types.Types$FloatType",
"org.apache.iceberg.types.Types$BooleanType",
"org.apache.iceberg.types.Types$DateType",
"org.apache.iceberg.types.Types$TimestampType",
"org.apache.iceberg.types.Types$TimeType",
"org.apache.iceberg.types.Types$DecimalType",
"org.apache.iceberg.types.Types$NestedField$",
"org.apache.iceberg.SnapshotRef",
"org.apache.iceberg.SnapshotRefType",
"org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize",
"org.apache.iceberg.MetadataTableType",
"org.apache.iceberg.BaseFile$1"
)
additionalIcebergNames.foreach(name => doRegister(name, kryo))
}
}
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object Extensions {
.groupBy(col(TableUtils(dataFrame.sparkSession).partitionColumn))
.count()
.collect()
.map(row => row.getString(0) -> row.getLong(1))
.map(row => row.get(0).toString -> row.getLong(1))
.toMap
DfWithStats(dataFrame, partitionCounts)
}
Expand Down
19 changes: 17 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.SPARK_VERSION
import java.io.File
import java.util.logging.Logger
import scala.util.Properties
import java.util.UUID

object SparkSessionBuilder {
@transient private lazy val logger = LoggerFactory.getLogger(getClass)
Expand All @@ -39,6 +40,8 @@ object SparkSessionBuilder {
additionalConfig: Option[Map[String, String]] = None,
enforceKryoSerializer: Boolean = true): SparkSession = {

val userName = Properties.userName
val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath)
// allow us to override the format by specifying env vars. This allows us to not have to worry about interference
// between Spark sessions created in existing chronon tests that need the hive format and some specific tests
// that require a format override like delta lake.
Expand All @@ -50,6 +53,19 @@ object SparkSessionBuilder {
"spark.chronon.table_write.format" -> "delta"
)
(configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator")
(configMap, "ai.chronon.spark.ChrononKryoRegistrator")
case Some("iceberg") =>
val configMap = Map(
"spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog",
"spark.chronon.table_write.format" -> "iceberg",
"spark.chronon.table_read.format" -> "iceberg",
"spark.sql.catalog.local" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.spark_catalog.type" -> "hadoop",
"spark.sql.catalog.spark_catalog.warehouse" -> s"$warehouseDir/data"
)
// TODO add an iceberg kryo registrator
(configMap, "ai.chronon.spark.ChrononIcebergKryoRegistrator")
case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator")
}

Expand All @@ -60,8 +76,7 @@ object SparkSessionBuilder {
//required to run spark locally with hive support enabled - for sbt test
System.setSecurityManager(null)
}
val userName = Properties.userName
val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath)

var baseBuilder = SparkSession
.builder()
.appName(name)
Expand Down
60 changes: 47 additions & 13 deletions spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,15 @@ case object Iceberg extends Format {
override def partitions(tableName: String, partitionColumns: Seq[String])(implicit
sparkSession: SparkSession): Seq[Map[String, String]] = {
sparkSession.sqlContext
.sql(s"SHOW PARTITIONS $tableName")
.sql(s"SELECT partition FROM $tableName" ++ ".partitions")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc does this work for regular hive tables?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for iceberg, Hive support is here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should work for hive tables, and the internal target I'm hitting are more or less "regular hive tables". Iceberg abstracts itself from the catalog implementation, so as long as your iceberg has an interface to your catalog implementation, it will work.

.collect()
.map(row => parseHivePartition(row.getString(0)))
.map { row =>
val partitionStruct = row.getStruct(0)
partitionStruct.schema.fieldNames.zipWithIndex.map {
case (fieldName, idx) =>
fieldName -> partitionStruct.get(idx).toString
}.toMap
}
}

private def getIcebergPartitions(tableName: String,
Expand Down Expand Up @@ -395,7 +401,14 @@ case class TableUtils(sparkSession: SparkSession) {
rdd
}

def tableExists(tableName: String): Boolean = sparkSession.catalog.tableExists(tableName)
def tableExists(tableName: String): Boolean = {
try {
sparkSession.sql(s"DESCRIBE TABLE $tableName")
true
} catch {
case _: AnalysisException => false
}
}

def loadEntireTable(tableName: String): DataFrame = sparkSession.table(tableName)

Expand Down Expand Up @@ -973,17 +986,38 @@ case class TableUtils(sparkSession: SparkSession) {
partitions: Seq[String],
partitionColumn: String = partitionColumn,
subPartitionFilters: Map[String, String] = Map.empty): Unit = {
// TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs
// notably, the unit test iceberg integration uses hadoop because of
// https://github.com/apache/iceberg/issues/7847
if (partitions.nonEmpty && tableExists(tableName)) {
val partitionSpecs = partitions
.map { partition =>
val mainSpec = s"$partitionColumn='$partition'"
val specs = mainSpec +: subPartitionFilters.map {
case (key, value) => s"${key}='${value}'"
}.toSeq
specs.mkString("PARTITION (", ",", ")")
}
.mkString(",")
val dropSql = s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs"
val dropSql = tableFormatProvider.readFormat(tableName) match {
// really this is Dsv1 vs Dsv2, not hive vs iceberg,
// but we break this way since only Iceberg is migrated to Dsv2
case Iceberg =>
// Build WHERE clause: (ds='2024-05-01' OR ds='2024-05-02') [AND k='v' AND …]
val mainPred = partitions
.map(p => s"$partitionColumn='${p}'")
.mkString("(", " OR ", ")")

val extraPred = subPartitionFilters
.map { case (k, v) => s"$k='${v}'" }
.mkString(" AND ")

val where = Seq(mainPred, extraPred).filter(_.nonEmpty).mkString(" AND ")

s"DELETE FROM $tableName WHERE $where"
case _ =>
val partitionSpecs = partitions
.map { partition =>
val mainSpec = s"$partitionColumn='$partition'"
val specs = mainSpec +: subPartitionFilters.map {
case (key, value) => s"${key}='${value}'"
}.toSeq
specs.mkString("PARTITION (", ",", ")")
}
.mkString(",")
s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs"
}
sql(dropSql)
} else {
logger.info(s"$tableName doesn't exist, please double check before drop partitions")
Expand Down
15 changes: 15 additions & 0 deletions spark/src/test/scala/ai/chronon/spark/test/ExtensionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,19 @@ class ExtensionsTest {
}
assertEquals(0, diff.count())
}

@Test
def testDfWithStatsLongPartition(): Unit = {
val df = Seq(
(1, 20240103L),
(2, 20240104L),
(3, 20240104L)
).toDF("key", "ds")

val dfWithStats: DfWithStats = DfWithStats(df)
val stats = dfWithStats.stats

assertEquals(3L, stats.count)
}

}
Loading