Skip to content

Commit d048bf3

Browse files
committed
Merge remote-tracking branch 'origin/joins-page' into joins-page
2 parents 8f62e42 + 788b145 commit d048bf3

File tree

10 files changed

+90
-39
lines changed

10 files changed

+90
-39
lines changed

.github/image/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ RUN wget -O /etc/apk/keys/amazoncorretto.rsa.pub https://apk.corretto.aws/amazon
1212
RUN apk add --no-cache amazon-corretto-17
1313

1414
# java
15-
ENV JAVA_HOME /usr/lib/jvm/default-jvm
16-
ENV PATH $PATH:$JAVA_HOME/bin
15+
ENV JAVA_HOME=/usr/lib/jvm/default-jvm
16+
ENV PATH=$PATH:$JAVA_HOME/bin
1717

1818
# sbt for scala
1919
RUN curl -L "https://github.com/sbt/sbt/releases/download/v1.8.2/sbt-1.8.2.tgz" | tar -xz -C /usr/local

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,6 @@ releases
7474

7575
# Generated during dynamodb kv store tests
7676
/cloud_aws/dynamodb-local-metadata.json
77+
78+
# Elastic Search files
79+
/docker-init/elasticsearch-data

cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,26 @@ import java.net.URI
1717
*/
1818
class AwsApiImpl(conf: Map[String, String]) extends Api(conf) {
1919
@transient lazy val ddbClient: DynamoDbClient = {
20-
val regionEnvVar =
21-
sys.env.getOrElse("AWS_DEFAULT_REGION", throw new IllegalArgumentException("Missing AWS_DEFAULT_REGION env var"))
22-
val dynamoEndpoint =
23-
sys.env.getOrElse("DYNAMO_ENDPOINT", throw new IllegalArgumentException("Missing DYNAMO_ENDPOINT env var"))
24-
25-
DynamoDbClient
20+
var builder = DynamoDbClient
2621
.builder()
27-
.region(Region.of(regionEnvVar))
28-
.endpointOverride(URI.create(dynamoEndpoint)) // TODO remove post docker
29-
.build()
22+
sys.env.get("AWS_DEFAULT_REGION").foreach { region =>
23+
try {
24+
builder = builder.region(Region.of(region))
25+
} catch {
26+
case e: IllegalArgumentException =>
27+
throw new IllegalArgumentException(s"Invalid AWS region format: $region", e)
28+
}
29+
}
30+
sys.env.get("DYNAMO_ENDPOINT").foreach { endpoint =>
31+
try {
32+
builder = builder.endpointOverride(URI.create(endpoint))
33+
} catch {
34+
case e: IllegalArgumentException =>
35+
throw new IllegalArgumentException(s"Invalid DynamoDB endpoint URI: $endpoint", e)
36+
}
37+
}
38+
builder.build()
39+
3040
}
3141

3242
override def genKvStore: KVStore = {

frontend/package-lock.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frontend/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,8 @@
6262
"lodash": "^4.17.21",
6363
"tailwind-merge": "^2.5.2",
6464
"tailwind-variants": "^0.2.1"
65+
},
66+
"overrides": {
67+
"cross-spawn": "^7.0.6"
6568
}
6669
}

frontend/src/lib/components/NavigationBar/NavigationBar.svelte

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
CommandInput,
88
CommandList,
99
CommandGroup,
10-
CommandItem
10+
CommandItem,
11+
CommandEmpty
1112
} from '$lib/components/ui/command/';
1213
import { search } from '$lib/api/api';
1314
import type { Model } from '$lib/types/Model/Model';
@@ -174,6 +175,7 @@
174175
bind:value={input}
175176
/>
176177
<CommandList>
178+
<CommandEmpty>No results found</CommandEmpty>
177179
{#if searchResults.length === 0}
178180
{#if input === ''}
179181
<CommandGroup heading="Quick actions">

hub/conf/application.conf

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@ play.http.parser.maxDiskBuffer=10M
1111

1212
play.filters.hosts {
1313
# Allow requests to localhost:9000 and "app" for now
14-
allowed = ["localhost:9000", ".app"]
14+
allowed = ["localhost:9000", ".app", ${?APP_SERVICE_HOST}]
1515
}
1616

1717
# Add CORS filter
1818
play.filters.enabled += "play.filters.cors.CORSFilter"
1919

2020
# CORS configuration
2121
play.filters.cors {
22-
allowedOrigins = ["http://localhost:5173", "http://localhost:3000"]
22+
allowedOrigins = ["http://localhost:5173",
23+
"http://localhost:3000",
24+
"http://${?FRONTEND_SERVICE_HOST}:${?FRONTEND_SERVICE_PORT}",
25+
"https://${?FRONTEND_SERVICE_HOST}:${?FRONTEND_SERVICE_PORT}"]
2326
allowedHttpMethods = ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
2427
allowedHttpHeaders = ["Accept", "Content-Type"]
2528
}

spark/src/main/scala/ai/chronon/spark/stats/drift/Expressions.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ object Expressions {
120120
)
121121
case MetricName.percentiles =>
122122
(row: Row, summaries: TileSummary) =>
123-
summaries.setPercentiles(row.getSeq[Double](index).map(lang.Double.valueOf).toJava)
123+
if (row.isNullAt(index)) {
124+
summaries.setPercentiles(null)
125+
} else {
126+
summaries.setPercentiles(row.getSeq[Double](index).map(lang.Double.valueOf).toJava)
127+
}
124128

125129
case MetricName.innerNullCount =>
126130
(row: Row, summaries: TileSummary) => summaries.setInnerNullCount(row.getLong(index))

spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -181,18 +181,16 @@ class Summarizer(confPath: String,
181181
counts
182182
}
183183

184-
// TODO: figure out why this gets called multiple times
185-
private def buildSummaryExpressions(inputDf: DataFrame, summaryInputDf: DataFrame): Seq[SummaryExpression] =
186-
summaryInputDf.schema.fields.flatMap { f =>
187-
val cardinalityMap = buildCardinalityMap(inputDf)
188-
val cardinality = if (cardinalityMap.contains(f.name)) {
189-
if (cardinalityMap(f.name) <= cardinalityThreshold) Cardinality.LOW else Cardinality.HIGH
190-
} else {
191-
logger.info(s"Cardinality not computed for column ${f.name}".yellow)
192-
Cardinality.LOW
193-
}
184+
private def buildSummaryExpressions(inputDf: DataFrame, summaryInputDf: DataFrame): Seq[SummaryExpression] = {
185+
val cardinalityMap = buildCardinalityMap(inputDf)
186+
val excludedFields = Set(Constants.TileColumn, tu.partitionColumn, Constants.TimeColumn)
187+
summaryInputDf.schema.fields.filterNot { f => excludedFields.contains(f.name) }.flatMap { f =>
188+
val cardinality =
189+
if (cardinalityMap(f.name + "_cardinality") <= cardinalityThreshold) Cardinality.LOW else Cardinality.HIGH
190+
194191
SummaryExpression.of(f.dataType, cardinality, f.name)
195192
}
193+
}
196194

197195
private[spark] def computeSummaryDf(df: DataFrame): (DataFrame, Seq[SummaryExpression]) = {
198196
val summaryInputDf = prepare(df)._2

spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.scalatest.matchers.should.Matchers
2222
import java.util.concurrent.TimeUnit
2323
import scala.concurrent.Await
2424
import scala.concurrent.duration.Duration
25-
import scala.util.ScalaJavaConversions.ListOps
25+
import scala.util.ScalaJavaConversions.IteratorOps
2626

2727
class DriftTest extends AnyFlatSpec with Matchers {
2828

@@ -102,18 +102,46 @@ class DriftTest extends AnyFlatSpec with Matchers {
102102
)
103103
val driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
104104

105-
driftSeries.foreach{s => println(s"${s.getKey.getColumn}: ${s.getPercentileDriftSeries.toScala}")}
105+
val (nulls, totals) = driftSeries.iterator.foldLeft(0 -> 0) {
106+
case ((nulls, total), s) =>
107+
val currentNulls = s.getPercentileDriftSeries.iterator().toScala.count(_ == null)
108+
val currentCount = s.getPercentileDriftSeries.size()
109+
(nulls + currentNulls, total + currentCount)
110+
}
111+
112+
println(
113+
s"""drift totals: $totals
114+
|drift nulls: $nulls
115+
|""".stripMargin.red)
106116

107117
println("Drift series fetched successfully".green)
108118

109-
// TODO: fix timeout issue
110-
// val summarySeriesFuture = driftStore.getSummarySeries(
111-
// join.metaData.nameToFilePath,
112-
// startMs,
113-
// endMs
114-
// )
115-
// val summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
116-
// summarySeries.foreach{s => println(s"${s.getKey.getColumn}: ${s.getPercentiles.toScala}")}
117-
// println("Summary series fetched successfully".green)
119+
totals should be > 0
120+
nulls.toDouble / totals.toDouble should be < 0.6
121+
122+
val summarySeriesFuture = driftStore.getSummarySeries(
123+
join.metaData.nameToFilePath,
124+
startMs,
125+
endMs
126+
)
127+
val summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
128+
val (summaryNulls, summaryTotals) = summarySeries.iterator.foldLeft(0 -> 0) {
129+
case ((nulls, total), s) =>
130+
if (s.getPercentiles == null) {
131+
(nulls + 1) -> (total + 1)
132+
} else {
133+
val currentNulls = s.getPercentiles.iterator().toScala.count(_ == null)
134+
val currentCount = s.getPercentiles.size()
135+
(nulls + currentNulls, total + currentCount)
136+
}
137+
}
138+
println(
139+
s"""summary ptile totals: $summaryTotals
140+
|summary ptile nulls: $summaryNulls
141+
|""".stripMargin)
142+
143+
summaryTotals should be > 0
144+
summaryNulls.toDouble / summaryTotals.toDouble should be < 0.1
145+
println("Summary series fetched successfully".green)
118146
}
119147
}

0 commit comments

Comments
 (0)