Skip to content

Commit e17ab6e

Browse files
committed
[SPARK-37779][SQL] Make ColumnarToRowExec plan canonicalizable after (de)serialization
This PR proposes to add a driver-side check on `supportsColumnar` sanity check at `ColumnarToRowExec`. SPARK-23731 fixed the plans to be serializable by leveraging lazy but SPARK-28213 happened to refer to the lazy variable at: https://github.com/apache/spark/blob/77b164aac9764049a4820064421ef82ec0bc14fb/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L68 This can fail during canonicalization during, for example, eliminating sub common expressions (on executor side): ``` java.lang.NullPointerException at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar$lzycompute(DataSourceScanExec.scala:280) at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar(DataSourceScanExec.scala:279) at org.apache.spark.sql.execution.InputAdapter.supportsColumnar(WholeStageCodegenExec.scala:509) at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:67) ... at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:581) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:580) at org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:110) ... at org.apache.spark.sql.catalyst.expressions.ExpressionEquals.hashCode(EquivalentExpressions.scala:275) ... at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135) at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.get(HashMap.scala:74) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:46) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTreeHelper$1(EquivalentExpressions.scala:147) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:170) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1(SubExprEvaluationRuntime.scala:89) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1$adapted(SubExprEvaluationRuntime.scala:89) at scala.collection.immutable.List.foreach(List.scala:392) ``` This fix is still a bandaid fix but at least addresses the issue with minimized change - this fix should ideally be backported too. Pretty unlikely - when `ColumnarToRowExec` has to be canonicalized on the executor side (see the stacktrace), but yes. it would fix a bug. Unittest was added. Closes #35058 from HyukjinKwon/SPARK-37779. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 195f1aa) Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 5e0f0da commit e17ab6e

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ trait ColumnarToRowTransition extends UnaryExecNode
6464
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
6565
*/
6666
case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
67-
assert(child.supportsColumnar)
67+
// supportsColumnar requires to be only called on driver side, see also SPARK-37779.
68+
assert(TaskContext.get != null || child.supportsColumnar)
6869

6970
override def output: Seq[Attribute] = child.output
7071

sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,23 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
8888
test("SPARK-30780 empty LocalTableScan should use RDD without partitions") {
8989
assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0)
9090
}
91+
92+
test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") {
93+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
94+
withTempPath { path =>
95+
spark.range(1).write.parquet(path.getAbsolutePath)
96+
val df = spark.read.parquet(path.getAbsolutePath)
97+
val columnarToRowExec =
98+
df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
99+
try {
100+
spark.range(1).foreach { _ =>
101+
columnarToRowExec.canonicalized
102+
()
103+
}
104+
} catch {
105+
case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e)
106+
}
107+
}
108+
}
109+
}
91110
}

0 commit comments

Comments
 (0)