Skip to content

Commit a2741df

Browse files
jmilis2000ZDQ870
authored andcommitted
Creation of Exact Quantile Check (#512)
* Creation of Exact Quantile Check * Fix build issue --------- Co-authored-by: ZDQ870 <[email protected]>
1 parent 944c4b4 commit a2741df

File tree

7 files changed

+159
-2
lines changed

7 files changed

+159
-2
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.analyzers
18+
19+
import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNumeric}
20+
import com.amazon.deequ.analyzers.Analyzers.{conditionalSelection, ifNoNullsIn}
21+
import com.amazon.deequ.metrics.FullColumn
22+
import org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting
23+
import org.apache.spark.sql.{Column, Row}
24+
import org.apache.spark.sql.functions.expr
25+
import org.apache.spark.sql.types.{DoubleType, StructType}
26+
27+
case class ExactQuantileState(exactQuantile: Double, quantile: Double, override val fullColumn: Option[Column] = None)
28+
extends DoubleValuedState[ExactQuantileState] with FullColumn {
29+
override def sum(other: ExactQuantileState): ExactQuantileState = {
30+
31+
ExactQuantileState(
32+
expr(s"percentile($fullColumn, $quantile)").toString().toDouble,
33+
quantile,
34+
sum(fullColumn, other.fullColumn))
35+
}
36+
37+
override def metricValue(): Double = {
38+
exactQuantile
39+
}
40+
}
41+
42+
case class ExactQuantile(column: String,
43+
quantile: Double,
44+
where: Option[String] = None)
45+
extends StandardScanShareableAnalyzer[ExactQuantileState]("ExactQuantile", column)
46+
with FilterableAnalyzer {
47+
override def aggregationFunctions(): Seq[Column] = {
48+
expr(s"percentile(${conditionalSelection(column, where).cast(DoubleType)}, $quantile)") :: Nil
49+
}
50+
51+
override def fromAggregationResult(result: Row, offset: Int): Option[ExactQuantileState] = {
52+
ifNoNullsIn(result, offset) { _ =>
53+
ExactQuantileState(result.getDouble(offset), quantile, Some(criterion))
54+
}
55+
}
56+
57+
override protected def additionalPreconditions(): Seq[StructType => Unit] = {
58+
hasColumn(column) :: isNumeric(column) :: Nil
59+
}
60+
61+
override def filterCondition: Option[String] = where
62+
63+
@VisibleForTesting
64+
private def criterion: Column = conditionalSelection(column, where).cast(DoubleType)
65+
}

src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ case class HdfsStateProvider(
135135
val serializedDigest = ApproximatePercentile.serializer.serialize(percentileDigest)
136136
persistBytes(serializedDigest, identifier)
137137

138+
case _: ExactQuantile =>
139+
persistDoubleState(state.asInstanceOf[ExactQuantileState].exactQuantile, identifier)
140+
138141
case _ =>
139142
throw new IllegalArgumentException(s"Unable to persist state for analyzer $analyzer.")
140143
}
@@ -177,6 +180,8 @@ case class HdfsStateProvider(
177180
val percentileDigest = ApproximatePercentile.serializer.deserialize(loadBytes(identifier))
178181
ApproxQuantileState(percentileDigest)
179182

183+
case _: ExactQuantile => ExactQuantile(identifier, loadDoubleState(identifier))
184+
180185
case _ =>
181186
throw new IllegalArgumentException(s"Unable to load state for analyzer $analyzer.")
182187
}

src/main/scala/com/amazon/deequ/checks/Check.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,26 @@ case class Check(
506506
approxQuantileConstraint(column, quantile, assertion, filter, hint))
507507
}
508508

509+
/**
510+
* Creates a constraint that asserts on an exact quantile
511+
*
512+
* @param column Column to run the assertion on
513+
* @param quantile Which quantile to assert on
514+
* @param assertion Function that receives a double input parameter (the computed quantile)
515+
* and returns a boolean
516+
* @param hint A hint to provide additional context why a constraint could have failed
517+
* @return
518+
*/
519+
def hasExactQuantile(column: String,
520+
quantile: Double,
521+
assertion: Double => Boolean,
522+
hint: Option[String] = None)
523+
: CheckWithLastConstraintFilterable = {
524+
525+
addFilterableConstraint(filter =>
526+
exactQuantileConstraint(column, quantile, assertion, filter, hint))
527+
}
528+
509529
/**
510530
* Creates a constraint that asserts on the minimum length of the column
511531
*

src/main/scala/com/amazon/deequ/constraints/Constraint.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,36 @@ object Constraint {
488488
new NamedConstraint(constraint, s"ApproxQuantileConstraint($approxQuantile)")
489489
}
490490

491+
/**
492+
* Runs exact quantile analysis on the given column and executes the assertion
493+
*
494+
* @param column Column to run the assertion on
495+
* @param quantile Which quantile to assert on
496+
* @param assertion Function that receives a double input parameter (the computed quantile)
497+
* and returns a boolean
498+
* @param where Additional filter to apply before the analyzer is run.
499+
* @param hint A hint to provide additional context why a constraint could have failed
500+
*/
501+
def exactQuantileConstraint(
502+
column: String,
503+
quantile: Double,
504+
assertion: Double => Boolean,
505+
where: Option[String] = None,
506+
hint: Option[String] = None)
507+
: Constraint = {
508+
509+
val exactQuantile = ExactQuantile(column, quantile, where = where)
510+
511+
fromAnalyzer(exactQuantile, assertion, hint)
512+
}
513+
514+
def fromAnalyzer(exactQuantile: ExactQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = {
515+
val constraint = AnalysisBasedConstraint[ExactQuantileState, Double, Double](
516+
exactQuantile, assertion, hint = hint)
517+
518+
new NamedConstraint(constraint, s"ExactQuantileConstraint($exactQuantile)")
519+
}
520+
491521
/**
492522
* Runs max length analysis on the given column and executes the assertion
493523
*

src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,12 @@ private[deequ] object AnalyzerSerializer
346346
result.addProperty("quantiles", approxQuantiles.quantiles.mkString(","))
347347
result.addProperty("relativeError", approxQuantiles.relativeError)
348348

349+
case exactQuantile: ExactQuantile =>
350+
result.addProperty(ANALYZER_NAME_FIELD, "ExactQuantile")
351+
result.addProperty(COLUMN_FIELD, exactQuantile.column)
352+
result.addProperty("quantile", exactQuantile.quantile)
353+
result.addProperty(WHERE_FIELD, exactQuantile.where.orNull)
354+
349355

350356
case minLength: MinLength =>
351357
result.addProperty(ANALYZER_NAME_FIELD, "MinLength")
@@ -481,6 +487,11 @@ private[deequ] object AnalyzerDeserializer
481487
val relativeError = json.get("relativeError").getAsDouble
482488
ApproxQuantiles(column, quantile, relativeError)
483489

490+
case "ExactQuantile" =>
491+
val column = json.get(COLUMN_FIELD).getAsString
492+
val quantile = json.get("quantile").getAsDouble
493+
ExactQuantile(column, quantile)
494+
484495
case "MinLength" =>
485496
MinLength(
486497
json.get(COLUMN_FIELD).getAsString,

src/test/scala/com/amazon/deequ/checks/CheckTest.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
583583
val numericAnalysis = AnalysisRunner.onData(dfNumeric).addAnalyzers(Seq(
584584
Minimum("att1"), Maximum("att1"), Mean("att1"), Sum("att1"),
585585
StandardDeviation("att1"), ApproxCountDistinct("att1"),
586-
ApproxQuantile("att1", quantile = 0.5)))
586+
ApproxQuantile("att1", quantile = 0.5), ExactQuantile("att1", quantile = 0.5)))
587587

588588
val contextNumeric = numericAnalysis.run()
589589

@@ -594,6 +594,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
594594
assertSuccess(baseCheck.hasStandardDeviation("att1", _ == 1.707825127659933), contextNumeric)
595595
assertSuccess(baseCheck.hasApproxCountDistinct("att1", _ == 6.0), contextNumeric)
596596
assertSuccess(baseCheck.hasApproxQuantile("att1", quantile = 0.5, _ == 3.0), contextNumeric)
597+
assertSuccess(baseCheck.hasExactQuantile("att1", quantile = 0.5, _ == 3.5), contextNumeric)
597598

598599
val correlationAnalysisInformative = AnalysisRunner.onData(dfInformative)
599600
.addAnalyzer(Correlation("att1", "att2"))
@@ -634,6 +635,19 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
634635
assertSuccess(hasApproxQuantileCheckWithFilter, context)
635636
}
636637

638+
"correctly evaluate hasExactQuantile constraints" in withSparkSession { sparkSession =>
639+
val hasExactQuantileCheck = Check(CheckLevel.Error, "a")
640+
.hasExactQuantile("att1", quantile = 0.5, _ == 3.5)
641+
val hasExactQuantileCheckWithFilter = Check(CheckLevel.Error, "a")
642+
.hasExactQuantile("att1", quantile = 0.5, _ == 5.0).where("att2 > 0")
643+
644+
val context = runChecks(getDfWithNumericValues(sparkSession), hasExactQuantileCheck,
645+
hasExactQuantileCheckWithFilter)
646+
647+
assertSuccess(hasExactQuantileCheck, context)
648+
assertSuccess(hasExactQuantileCheckWithFilter, context)
649+
}
650+
637651
"yield correct results for minimum and maximum length stats" in
638652
withSparkSession { sparkSession =>
639653
val baseCheck = Check(CheckLevel.Error, description = "a description")

src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ class AnalysisResultSerdeTest extends FlatSpec with Matchers {
8383
MinLength("ColumnA") ->
8484
DoubleMetric(Entity.Column, "MinLength", "ColumnA", Success(5.0)),
8585
MaxLength("ColumnA") ->
86-
DoubleMetric(Entity.Column, "MaxLength", "ColumnA", Success(5.0))
86+
DoubleMetric(Entity.Column, "MaxLength", "ColumnA", Success(5.0)),
87+
ExactQuantile("ColumnA", 0.5) ->
88+
DoubleMetric(Entity.Column, "Completeness", "ColumnA", Success(5.0))
8789
))
8890

8991
val dateTime = LocalDate.of(2017, 10, 14).atTime(10, 10, 10)
@@ -173,6 +175,16 @@ class AnalysisResultSerdeTest extends FlatSpec with Matchers {
173175
assertCorrectlyConvertsAnalysisResults(Seq(result))
174176
}
175177

178+
"serialization of ExactQuantile" should "correctly restore it" in {
179+
180+
val analyzer = ExactQuantile("col", 0.5)
181+
val metric = DoubleMetric(Entity.Column, "ExactQuantile", "col", Success(0.5))
182+
val context = AnalyzerContext(Map(analyzer -> metric))
183+
val result = new AnalysisResult(ResultKey(0), context)
184+
185+
assertCorrectlyConvertsAnalysisResults(Seq(result))
186+
}
187+
176188
val histogramSumJson =
177189
"""[
178190
| {

0 commit comments

Comments
 (0)