-
Notifications
You must be signed in to change notification settings - Fork 70
Iceberg unit tests, support Iceberg + nonhive catalogs, Iceberg Kryo Serializer #993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
00d4da8
93b7c92
2d4ed6f
878f64a
4739e93
65c81a1
cab2c6a
f4bfb70
ac505d3
ae2088f
dee01ab
69cd50d
5526abe
2b9c246
b423f4b
9e5eaae
558adc3
c1eb8a2
510266c
58a58ff
6ea4121
cb012a6
117c4f7
7d3272f
f009f97
02c1463
b05ef1a
059e16e
907d2f8
0065406
a956c7f
e980fa1
4054892
2ffd32d
50445f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -239,9 +239,15 @@ case object Iceberg extends Format { | |
override def partitions(tableName: String, partitionColumns: Seq[String])(implicit | ||
sparkSession: SparkSession): Seq[Map[String, String]] = { | ||
sparkSession.sqlContext | ||
.sql(s"SHOW PARTITIONS $tableName") | ||
.sql(s"SELECT partition FROM $tableName" ++ ".partitions") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooc does this work for regular hive tables? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for iceberg, Hive support is here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should work for hive tables, and the internal target I'm hitting are more or less "regular hive tables". Iceberg abstracts itself from the catalog implementation, so as long as your iceberg has an interface to your catalog implementation, it will work. |
||
.collect() | ||
.map(row => parseHivePartition(row.getString(0))) | ||
.map { row => | ||
val partitionStruct = row.getStruct(0) | ||
partitionStruct.schema.fieldNames.zipWithIndex.map { | ||
case (fieldName, idx) => | ||
fieldName -> partitionStruct.get(idx).toString | ||
}.toMap | ||
} | ||
} | ||
|
||
private def getIcebergPartitions(tableName: String, | ||
|
@@ -395,7 +401,14 @@ case class TableUtils(sparkSession: SparkSession) { | |
rdd | ||
} | ||
|
||
def tableExists(tableName: String): Boolean = sparkSession.catalog.tableExists(tableName) | ||
def tableExists(tableName: String): Boolean = { | ||
try { | ||
sparkSession.sql(s"DESCRIBE TABLE $tableName") | ||
true | ||
} catch { | ||
case _: AnalysisException => false | ||
} | ||
} | ||
|
||
def loadEntireTable(tableName: String): DataFrame = sparkSession.table(tableName) | ||
|
||
|
@@ -973,17 +986,38 @@ case class TableUtils(sparkSession: SparkSession) { | |
partitions: Seq[String], | ||
partitionColumn: String = partitionColumn, | ||
subPartitionFilters: Map[String, String] = Map.empty): Unit = { | ||
// TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs | ||
// notably, the unit test iceberg integration uses hadoop because of | ||
// https://github.com/apache/iceberg/issues/7847 | ||
if (partitions.nonEmpty && tableExists(tableName)) { | ||
val partitionSpecs = partitions | ||
.map { partition => | ||
val mainSpec = s"$partitionColumn='$partition'" | ||
val specs = mainSpec +: subPartitionFilters.map { | ||
case (key, value) => s"${key}='${value}'" | ||
}.toSeq | ||
specs.mkString("PARTITION (", ",", ")") | ||
} | ||
.mkString(",") | ||
val dropSql = s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs" | ||
val dropSql = tableFormatProvider.readFormat(tableName) match { | ||
// really this is Dsv1 vs Dsv2, not hive vs iceberg, | ||
// but we break this way since only Iceberg is migrated to Dsv2 | ||
case Iceberg => | ||
// Build WHERE clause: (ds='2024-05-01' OR ds='2024-05-02') [AND k='v' AND …] | ||
val mainPred = partitions | ||
.map(p => s"$partitionColumn='${p}'") | ||
.mkString("(", " OR ", ")") | ||
|
||
val extraPred = subPartitionFilters | ||
.map { case (k, v) => s"$k='${v}'" } | ||
.mkString(" AND ") | ||
|
||
val where = Seq(mainPred, extraPred).filter(_.nonEmpty).mkString(" AND ") | ||
|
||
s"DELETE FROM $tableName WHERE $where" | ||
case _ => | ||
val partitionSpecs = partitions | ||
.map { partition => | ||
val mainSpec = s"$partitionColumn='$partition'" | ||
val specs = mainSpec +: subPartitionFilters.map { | ||
case (key, value) => s"${key}='${value}'" | ||
}.toSeq | ||
specs.mkString("PARTITION (", ",", ")") | ||
} | ||
.mkString(",") | ||
s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs" | ||
} | ||
sql(dropSql) | ||
} else { | ||
logger.info(s"$tableName doesn't exist, please double check before drop partitions") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is working with metals relative to intellij? does the debugger work as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really good actually. The debugged worked out of the box, I found it comparable to IntelliJ overall.
I'd recommend it to anyone who has remote dev boxes since VSCode's integration is far better in my experience. All the tests run a lot faster and I got in way more dev cycles. I probably would only recommend over IntelliJ with a dev box though.