Skip to content

Commit bfd67d0

Browse files
committed
fix
1 parent 46b6ccb commit bfd67d0

File tree

5 files changed

+171
-1
lines changed

5 files changed

+171
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
504504
ResolveEncodersInUDF),
505505
Batch("Subquery", Once,
506506
UpdateOuterReferences),
507+
Batch("Strip __is_duplicate metadata", Once, StripIsDuplicateMetadata),
507508
Batch("Cleanup", fixedPoint,
508509
CleanupAliases),
509510
Batch("HandleSpecialCommand", Once,
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.SQLConfHelper
21+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
22+
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan}
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
25+
import org.apache.spark.sql.internal.SQLConf
26+
import org.apache.spark.sql.types.MetadataBuilder
27+
28+
/**
29+
* Strips is duplicate metadata from all attributes and aliases as it is no longer needed after
30+
* resolution.
31+
*/
32+
object StripIsDuplicateMetadata extends Rule[LogicalPlan] with SQLConfHelper {
33+
def apply(plan: LogicalPlan): LogicalPlan = AnalysisHelper.allowInvokingTransformsInAnalyzer {
34+
if (!conf.getConf(SQLConf.STRIP_IS_DUPLICATE_METADATA)) {
35+
plan
36+
} else {
37+
plan transformAllExpressions {
38+
case alias: Alias if alias.metadata.contains("__is_duplicate") =>
39+
val newMetadata = new MetadataBuilder()
40+
.withMetadata(alias.metadata)
41+
.remove("__is_duplicate")
42+
.build()
43+
44+
val newAlias = CurrentOrigin.withOrigin(alias.origin) {
45+
Alias(child = alias.child, name = alias.name)(
46+
exprId = alias.exprId,
47+
qualifier = alias.qualifier,
48+
explicitMetadata = Some(newMetadata),
49+
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
50+
)
51+
}
52+
newAlias.copyTagsFrom(alias)
53+
54+
newAlias
55+
case attribute: Attribute if attribute.metadata.contains("__is_duplicate") =>
56+
val newMetadata = new MetadataBuilder()
57+
.withMetadata(attribute.metadata)
58+
.remove("__is_duplicate")
59+
.build()
60+
61+
val newAttribute = CurrentOrigin.withOrigin(attribute.origin) {
62+
attribute.withMetadata(newMetadata = newMetadata)
63+
}
64+
newAttribute.copyTagsFrom(attribute)
65+
66+
newAttribute
67+
}
68+
}
69+
}
70+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverRunner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

2020
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, SQLConfHelper}
21-
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, CleanupAliases}
21+
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, CleanupAliases, StripIsDuplicateMetadata}
2222
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.internal.SQLConf
@@ -41,6 +41,7 @@ class ResolverRunner(
4141
*/
4242
private val planRewriteRules: Seq[Rule[LogicalPlan]] = Seq(
4343
PruneMetadataColumns,
44+
StripIsDuplicateMetadata,
4445
CleanupAliases
4546
)
4647

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,14 @@ object SQLConf {
241241
}
242242
}
243243

244+
val STRIP_IS_DUPLICATE_METADATA =
245+
buildConf("spark.sql.analyzer.stripIsDuplicateMetadata")
246+
.internal()
247+
.doc("When true, strip __is_duplicate metadata after resolution batch in analysis since " +
248+
"it is no longer needed.")
249+
.booleanConf
250+
.createWithDefault(true)
251+
244252
val ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS =
245253
buildConf("spark.sql.analyzer.uniqueNecessaryMetadataColumns")
246254
.internal()
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import java.util.ArrayList
21+
22+
import org.apache.spark.sql.catalyst.expressions.NamedExpression
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.internal.SQLConf
25+
import org.apache.spark.sql.test.SharedSparkSession
26+
27+
class StripIsDuplicateMetadataSuite extends QueryTest with SharedSparkSession {
28+
29+
test("Strip __is_duplicate from project list") {
30+
withTable("t1") {
31+
sql("CREATE TABLE t1(col1 INT, col2 INT)")
32+
val query =
33+
"""SELECT * FROM (
34+
| SELECT col1, col1 FROM t1
35+
| UNION
36+
| SELECT col1, col2 FROM t1
37+
|)""".stripMargin
38+
39+
checkMetadata(query)
40+
}
41+
}
42+
43+
test("Strip __is_duplicate from Union output") {
44+
withTable("t1") {
45+
sql("CREATE TABLE t1(col1 INT, col2 INT)")
46+
val query =
47+
"""SELECT col1, col1 FROM t1
48+
|UNION
49+
|SELECT col1, col2 FROM t1""".stripMargin
50+
51+
checkMetadata(query)
52+
}
53+
}
54+
55+
test("Strip __is_duplicate from CTEs") {
56+
withTable("t1") {
57+
sql("CREATE TABLE t1(col1 INT, col2 INT)")
58+
val query =
59+
"""WITH cte1 AS (
60+
| SELECT col1, col1 FROM t1
61+
|),
62+
|cte2 AS (
63+
| SELECT col1, col2 FROM t1
64+
|)
65+
|SELECT * FROM cte1
66+
|UNION
67+
|SELECT * FROM cte2""".stripMargin
68+
69+
checkMetadata(query)
70+
}
71+
}
72+
73+
private def checkMetadata(query: String): Unit = {
74+
for (stripMetadata <- Seq(true, false)) {
75+
withSQLConf(SQLConf.STRIP_IS_DUPLICATE_METADATA.key -> stripMetadata.toString) {
76+
val analyzedPlan = sql(query).queryExecution.analyzed
77+
val duplicateAttributes = new ArrayList[NamedExpression]
78+
analyzedPlan.foreach {
79+
case plan: LogicalPlan => plan.expressions.foreach {
80+
case namedExpression: NamedExpression
81+
if namedExpression.metadata.contains("__is_duplicate") =>
82+
duplicateAttributes.add(namedExpression)
83+
case _ =>
84+
}
85+
}
86+
assert(duplicateAttributes.isEmpty == stripMetadata)
87+
}
88+
}
89+
}
90+
}

0 commit comments

Comments
 (0)