-
Notifications
You must be signed in to change notification settings - Fork 70
Add support for Delta lake tables in TableUtils #869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c3aae9b
1a64baa
5e134ed
cd801ca
51ae067
6a326a5
c72ef70
02dda37
fe4cc83
d22525d
81dd918
2ff3ed5
f3a26ba
b7b084a
ad14e67
f15cc5b
7a9396e
f93d83e
48a7604
837c943
2dd5a33
e24a9d6
1d36d63
5c8ed17
73577af
46748b6
b3cb5a5
aa76d05
3066719
2b381c2
6affb63
e3d6632
48cccf1
5673ac4
1af4545
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,14 +22,14 @@ import org.apache.spark.SPARK_VERSION | |
|
||
import java.io.File | ||
import java.util.logging.Logger | ||
import scala.reflect.io.Path | ||
import scala.util.Properties | ||
|
||
object SparkSessionBuilder { | ||
@transient private lazy val logger = LoggerFactory.getLogger(getClass) | ||
|
||
private val warehouseId = java.util.UUID.randomUUID().toString.takeRight(6) | ||
private val DefaultWarehouseDir = new File("/tmp/chronon/spark-warehouse_" + warehouseId) | ||
val FormatTestEnvVar: String = "format_test" | ||
|
||
def expandUser(path: String): String = path.replaceFirst("~", System.getProperty("user.home")) | ||
// we would want to share locally generated warehouse during CI testing | ||
|
@@ -38,6 +38,24 @@ object SparkSessionBuilder { | |
localWarehouseLocation: Option[String] = None, | ||
additionalConfig: Option[Map[String, String]] = None, | ||
enforceKryoSerializer: Boolean = true): SparkSession = { | ||
|
||
// allow us to override the format by specifying env vars. This allows us to not have to worry about interference | ||
// between Spark sessions created in existing chronon tests that need the hive format and some specific tests | ||
// that require a format override like delta lake. | ||
val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like this is only used for local testing. Could we move it within the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could - I actually thought the code is cleaner with the match up front - this allows us to skip the use of vars and a lot of if/else type of checks in a couple of places. |
||
case Some("deltalake") => | ||
val configMap = Map( | ||
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", | ||
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog", | ||
"spark.chronon.table_write.format" -> "delta" | ||
) | ||
(configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") | ||
case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") | ||
} | ||
|
||
// tack on format configs with additional configs | ||
val mergedConfigs = additionalConfig.getOrElse(Map.empty) ++ formatConfigs | ||
|
||
if (local) { | ||
//required to run spark locally with hive support enabled - for sbt test | ||
System.setSecurityManager(null) | ||
|
@@ -61,13 +79,12 @@ object SparkSessionBuilder { | |
if (enforceKryoSerializer) { | ||
baseBuilder | ||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | ||
.config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator") | ||
.config("spark.kryo.registrator", kryoRegistrator) | ||
.config("spark.kryoserializer.buffer.max", "2000m") | ||
.config("spark.kryo.referenceTracking", "false") | ||
} | ||
additionalConfig.foreach { configMap => | ||
configMap.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) } | ||
} | ||
|
||
mergedConfigs.foreach { config => baseBuilder = baseBuilder.config(config._1, config._2) } | ||
|
||
if (SPARK_VERSION.startsWith("2")) { | ||
// Otherwise files left from deleting the table with the same name result in test failures | ||
|
Uh oh!
There was an error while loading. Please reload this page.