Skip to content

Commit f53283e

Browse files
authored
Replace Spark SQL isNull check with Spark Scala based DSL (#493)
- This is to ensure columns with spaces in their names get their names escaped correctly in the where condition. - Added a test to verify.
1 parent 2eaeaed commit f53283e

File tree

5 files changed

+56
-13
lines changed

5 files changed

+56
-13
lines changed

src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -465,20 +465,26 @@ private[deequ] object Analyzers {
465465
conditionalSelection(col(selection), where)
466466
}
467467

468-
def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = {
469-
val conditionColumn = where.map(expr)
470-
conditionColumn
468+
def conditionSelectionGivenColumn(selection: Column, where: Option[Column], replaceWith: Double): Column = {
469+
where
471470
.map { condition => when(condition, replaceWith).otherwise(selection) }
472471
.getOrElse(selection)
473472
}
474473

475-
def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = {
476-
val conditionColumn = where.map(expr)
477-
conditionColumn
474+
def conditionSelectionGivenColumn(selection: Column, where: Option[Column], replaceWith: String): Column = {
475+
where
478476
.map { condition => when(condition, replaceWith).otherwise(selection) }
479477
.getOrElse(selection)
480478
}
481479

480+
def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = {
481+
conditionSelectionGivenColumn(selection, where.map(expr), replaceWith)
482+
}
483+
484+
def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = {
485+
conditionSelectionGivenColumn(selection, where.map(expr), replaceWith)
486+
}
487+
482488
def conditionalSelection(selection: Column, condition: Option[String]): Column = {
483489
val conditionColumn = condition.map { expression => expr(expression) }
484490
conditionalSelectionFromColumns(selection, conditionColumn)

src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio
4949
override def filterCondition: Option[String] = where
5050

5151
private def criterion(nullBehavior: NullBehavior): Column = {
52+
val isNullCheck = col(column).isNull
5253
nullBehavior match {
5354
case NullBehavior.Fail =>
5455
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
55-
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MaxValue)
56+
conditionSelectionGivenColumn(colLengths, Option(isNullCheck), replaceWith = Double.MaxValue)
5657
case NullBehavior.EmptyString =>
57-
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
58+
length(conditionSelectionGivenColumn(col(column), Option(isNullCheck), replaceWith = "")).cast(DoubleType)
5859
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
5960
}
6061
}

src/main/scala/com/amazon/deequ/analyzers/MinLength.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import com.amazon.deequ.analyzers.Preconditions.hasColumn
2222
import com.amazon.deequ.analyzers.Preconditions.isString
2323
import org.apache.spark.sql.Column
2424
import org.apache.spark.sql.Row
25-
import org.apache.spark.sql.functions.{col, length, min}
25+
import org.apache.spark.sql.functions.col
26+
import org.apache.spark.sql.functions.length
27+
import org.apache.spark.sql.functions.min
2628
import org.apache.spark.sql.types.DoubleType
2729
import org.apache.spark.sql.types.StructType
2830

@@ -47,12 +49,13 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio
4749
override def filterCondition: Option[String] = where
4850

4951
private[deequ] def criterion(nullBehavior: NullBehavior): Column = {
52+
val isNullCheck = col(column).isNull
5053
nullBehavior match {
5154
case NullBehavior.Fail =>
5255
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
53-
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MinValue)
56+
conditionSelectionGivenColumn(colLengths, Option(isNullCheck), replaceWith = Double.MinValue)
5457
case NullBehavior.EmptyString =>
55-
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
58+
length(conditionSelectionGivenColumn(col(column), Option(isNullCheck), replaceWith = "")).cast(DoubleType)
5659
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
5760
}
5861
}

src/test/scala/com/amazon/deequ/profiles/ColumnProfilerTest.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
7878
assert(actualColumnProfile == expectedColumnProfile)
7979
}
8080

81+
"return correct StringColumnProfile for column names with spaces" in withSparkSession { session =>
82+
val data = getDfCompleteAndInCompleteColumnsWithSpacesInNames(session)
83+
val columnNames = data.columns.toSeq
84+
85+
val lengthMap = Map(
86+
"att 1" -> (1, 3),
87+
"att 2" -> (0, 7)
88+
)
89+
90+
lengthMap.foreach { case (columnName, (minLength, maxLength)) =>
91+
val actualColumnProfile = ColumnProfiler.profile(data, Option(columnNames), false, 1)
92+
.profiles(columnName)
93+
94+
assert(actualColumnProfile.isInstanceOf[StringColumnProfile])
95+
val actualStringColumnProfile = actualColumnProfile.asInstanceOf[StringColumnProfile]
96+
97+
assert(actualStringColumnProfile.minLength.contains(minLength))
98+
assert(actualStringColumnProfile.maxLength.contains(maxLength))
99+
}
100+
}
101+
81102
"return correct columnProfiles with predefined dataType" in withSparkSession { session =>
82103

83104
val data = getDfCompleteAndInCompleteColumns(session)
@@ -131,7 +152,6 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
131152
assert(actualColumnProfile == expectedColumnProfile)
132153
}
133154

134-
135155
"return correct NumericColumnProfiles for numeric String DataType columns" in
136156
withSparkSession { session =>
137157

@@ -171,6 +191,7 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
171191
assertProfilesEqual(expectedColumnProfile,
172192
actualColumnProfile.asInstanceOf[NumericColumnProfile])
173193
}
194+
174195
"return correct NumericColumnProfiles for numeric String DataType columns when " +
175196
"kllProfiling disabled" in withSparkSession { session =>
176197

@@ -562,7 +583,6 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
562583
)
563584

564585
assertSameColumnProfiles(columnProfiles.profiles, expectedProfiles)
565-
566586
}
567587

568588
private[this] def assertSameColumnProfiles(

src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,19 @@ trait FixtureSupport {
159159
).toDF("item", "att1", "att2")
160160
}
161161

162+
def getDfCompleteAndInCompleteColumnsWithSpacesInNames(sparkSession: SparkSession): DataFrame = {
163+
import sparkSession.implicits._
164+
165+
Seq(
166+
("1", "ab", "abc1"),
167+
("2", "bc", null),
168+
("3", "ab", "def2ghi"),
169+
("4", "ab", null),
170+
("5", "bcd", "ab"),
171+
("6", "a", "pqrs")
172+
).toDF("some item", "att 1", "att 2")
173+
}
174+
162175
def getDfCompleteAndInCompleteColumnsAndVarLengthStrings(sparkSession: SparkSession): DataFrame = {
163176
import sparkSession.implicits._
164177

0 commit comments

Comments
 (0)