Skip to content

Commit a6ad9e4

Browse files
authored
[New Scheduler] Add duration checker (#4984)
* Add a duration checker for Elasticsearch. * Add configurations for the ElasticSearchDurationCheckerTests class * Use a private helper function to execute queries. * Add an Ansible variable for the duration checker. * Apply scalaFmt * Include test cases for duration checker to system tests. * Setup ElasticSearch for system tests. * Increase patience config to wait for response longer. * Add postfixOps
1 parent b0baa7b commit a6ad9e4

File tree

11 files changed

+875
-19
lines changed

11 files changed

+875
-19
lines changed

ansible/group_vars/all

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,3 +421,6 @@ metrics:
421421
port: "{{ metrics_kamon_statsd_port | default('8125') }}"
422422

423423
user_events: "{{ user_events_enabled | default(false) | lower }}"
424+
425+
durationChecker:
426+
timeWindow: "{{ duration_checker_time_window | default('1 d') }}"

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,8 @@ object ConfigKeys {
272272
val metrics = "whisk.metrics"
273273
val featureFlags = "whisk.feature-flags"
274274

275+
val durationChecker = s"whisk.duration-checker"
276+
275277
val whiskConfig = "whisk.config"
276278
val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only"
277279
val swaggerUi = "whisk.swagger-ui"

common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,27 +57,17 @@ case class ElasticSearchActivationStoreConfig(protocol: String,
5757

5858
class ElasticSearchActivationStore(
5959
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
60-
elasticSearchConfig: ElasticSearchActivationStoreConfig =
61-
loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore),
60+
elasticSearchConfig: ElasticSearchActivationStoreConfig,
6261
useBatching: Boolean = false)(implicit actorSystem: ActorSystem,
6362
actorMaterializer: ActorMaterializer,
6463
logging: Logging)
6564
extends ActivationStore {
6665

6766
import com.sksamuel.elastic4s.http.ElasticDsl._
67+
import ElasticSearchActivationStore.{generateIndex, httpClientCallback}
6868

6969
private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher
7070

71-
private val httpClientCallback = new HttpClientConfigCallback {
72-
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
73-
val provider = new BasicCredentialsProvider
74-
provider.setCredentials(
75-
AuthScope.ANY,
76-
new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
77-
httpClientBuilder.setDefaultCredentialsProvider(provider)
78-
}
79-
}
80-
8171
private val client =
8272
ElasticClient(
8373
ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
@@ -407,18 +397,38 @@ class ElasticSearchActivationStore(
407397
activationId.toString.split("/")(0)
408398
}
409399

410-
private def generateIndex(namespace: String): String = {
411-
elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
412-
}
413-
414400
private def generateRangeQuery(key: String, since: Option[Instant], upto: Option[Instant]): RangeQuery = {
415401
rangeQuery(key)
416402
.gte(since.map(_.toEpochMilli).getOrElse(minStart))
417403
.lte(upto.map(_.toEpochMilli).getOrElse(maxStart))
418404
}
419405
}
420406

407+
object ElasticSearchActivationStore {
408+
val elasticSearchConfig: ElasticSearchActivationStoreConfig =
409+
loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore)
410+
411+
val httpClientCallback = new HttpClientConfigCallback {
412+
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
413+
val provider = new BasicCredentialsProvider
414+
provider.setCredentials(
415+
AuthScope.ANY,
416+
new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
417+
httpClientBuilder.setDefaultCredentialsProvider(provider)
418+
}
419+
}
420+
421+
def generateIndex(namespace: String): String = {
422+
elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
423+
}
424+
}
425+
421426
object ElasticSearchActivationStoreProvider extends ActivationStoreProvider {
427+
import ElasticSearchActivationStore.elasticSearchConfig
428+
422429
override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
423-
new ElasticSearchActivationStore(useBatching = true)(actorSystem, actorMaterializer, logging)
430+
new ElasticSearchActivationStore(elasticSearchConfig = elasticSearchConfig, useBatching = true)(
431+
actorSystem,
432+
actorMaterializer,
433+
logging)
424434
}
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.openwhisk.core.scheduler.queue
18+
19+
import akka.actor.ActorSystem
20+
import com.sksamuel.elastic4s.http.ElasticDsl._
21+
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
22+
import com.sksamuel.elastic4s.searches.queries.Query
23+
import com.sksamuel.elastic4s.{ElasticDate, ElasticDateMath, Seconds}
24+
import org.apache.openwhisk.common.Logging
25+
import org.apache.openwhisk.core.ConfigKeys
26+
import org.apache.openwhisk.core.entity.WhiskActionMetaData
27+
import org.apache.openwhisk.spi.Spi
28+
import pureconfig.loadConfigOrThrow
29+
import spray.json.{JsArray, JsNumber, JsValue, RootJsonFormat, deserializationError, _}
30+
31+
import scala.concurrent.Future
32+
import scala.concurrent.duration.FiniteDuration
33+
import scala.language.implicitConversions
34+
import scala.util.{Failure, Try}
35+
import pureconfig.generic.auto._
36+
37+
trait DurationChecker {
38+
def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
39+
callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult]
40+
}
41+
42+
case class DurationCheckResult(averageDuration: Option[Double], hitCount: Long, took: Long)
43+
44+
object ElasticSearchDurationChecker {
45+
val FilterAggregationName = "filterAggregation"
46+
val AverageAggregationName = "averageAggregation"
47+
48+
implicit val serde = new ElasticSearchDurationCheckResultFormat()
49+
50+
def getFromDate(timeWindow: FiniteDuration): ElasticDateMath =
51+
ElasticDate.now minus (timeWindow.toSeconds.toInt, Seconds)
52+
}
53+
54+
class ElasticSearchDurationChecker(private val client: ElasticClient, val timeWindow: FiniteDuration)(
55+
implicit val actorSystem: ActorSystem,
56+
implicit val logging: Logging)
57+
extends DurationChecker {
58+
import ElasticSearchDurationChecker._
59+
import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore.generateIndex
60+
61+
implicit val ec = actorSystem.getDispatcher
62+
63+
override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
64+
callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = {
65+
val index = generateIndex(invocationNamespace)
66+
val fqn = actionMetaData.fullyQualifiedName(false)
67+
val fromDate = getFromDate(timeWindow)
68+
69+
logging.info(this, s"check average duration for $fqn in $index for last $timeWindow")
70+
71+
actionMetaData.binding match {
72+
case Some(binding) =>
73+
val boolQueryResult = List(
74+
matchQuery("annotations.binding", s"$binding"),
75+
matchQuery("name", actionMetaData.name),
76+
rangeQuery("@timestamp").gte(fromDate))
77+
78+
executeQuery(boolQueryResult, callback, index)
79+
80+
case None =>
81+
val queryResult = List(matchQuery("path.keyword", fqn.toString), rangeQuery("@timestamp").gte(fromDate))
82+
83+
executeQuery(queryResult, callback, index)
84+
}
85+
}
86+
87+
private def executeQuery(boolQueryResult: List[Query],
88+
callback: DurationCheckResult => DurationCheckResult,
89+
index: String) = {
90+
client
91+
.execute {
92+
(search(index) query {
93+
boolQuery must {
94+
boolQueryResult
95+
}
96+
} aggregations
97+
avgAgg(AverageAggregationName, "duration")).size(0)
98+
}
99+
.map { res =>
100+
logging.debug(this, s"ElasticSearch query results: $res")
101+
Try(serde.read(res.body.getOrElse("").parseJson))
102+
}
103+
.flatMap(Future.fromTry)
104+
.map(callback(_))
105+
.andThen {
106+
case Failure(t) =>
107+
logging.error(this, s"failed to check the average duration: ${t}")
108+
}
109+
}
110+
}
111+
112+
object ElasticSearchDurationCheckerProvider extends DurationCheckerProvider {
113+
import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore._
114+
115+
override def instance(actorSystem: ActorSystem, log: Logging): ElasticSearchDurationChecker = {
116+
implicit val as: ActorSystem = actorSystem
117+
implicit val logging: Logging = log
118+
119+
val elasticClient =
120+
ElasticClient(
121+
ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
122+
NoOpRequestConfigCallback,
123+
httpClientCallback)
124+
125+
new ElasticSearchDurationChecker(elasticClient, durationCheckerConfig.timeWindow)
126+
}
127+
}
128+
129+
trait DurationCheckerProvider extends Spi {
130+
131+
val durationCheckerConfig: DurationCheckerConfig =
132+
loadConfigOrThrow[DurationCheckerConfig](ConfigKeys.durationChecker)
133+
134+
def instance(actorSystem: ActorSystem, logging: Logging): DurationChecker
135+
}
136+
137+
class ElasticSearchDurationCheckResultFormat extends RootJsonFormat[DurationCheckResult] {
138+
import ElasticSearchDurationChecker._
139+
import spray.json.DefaultJsonProtocol._
140+
141+
/**
142+
* Expected sample data
143+
{
144+
"_shards": {
145+
"failed": 0,
146+
"skipped": 0,
147+
"successful": 5,
148+
"total": 5
149+
},
150+
"aggregations": {
151+
"agg": {
152+
"value": 14
153+
}
154+
},
155+
"hits": {
156+
"hits": [],
157+
"max_score": 0,
158+
"total": 3
159+
},
160+
"timed_out": false,
161+
"took": 0
162+
}
163+
*/
164+
/**
165+
* Expected sample data
166+
{
167+
"_shards": {
168+
"failed": 0,
169+
"skipped": 0,
170+
"successful": 5,
171+
"total": 5
172+
},
173+
"aggregations": {
174+
"pathAggregation": {
175+
"avg_duration": {
176+
"value": 13
177+
},
178+
"doc_count": 3
179+
}
180+
},
181+
"hits": {
182+
"hits": [],
183+
"max_score": 0,
184+
"total": 6
185+
},
186+
"timed_out": false,
187+
"took": 0
188+
}
189+
*/
190+
implicit def read(json: JsValue) = {
191+
val jsObject = json.asJsObject
192+
193+
jsObject.getFields("aggregations", "took", "hits") match {
194+
case Seq(aggregations, took, hits) =>
195+
val hitCount = hits.asJsObject.getFields("total").headOption
196+
val filterAggregations = aggregations.asJsObject.getFields(FilterAggregationName)
197+
val averageAggregations = aggregations.asJsObject.getFields(AverageAggregationName)
198+
199+
(filterAggregations, averageAggregations, hitCount) match {
200+
case (filterAggregations, _, Some(count)) if filterAggregations.nonEmpty =>
201+
val averageDuration =
202+
filterAggregations.headOption.flatMap(
203+
_.asJsObject
204+
.getFields(AverageAggregationName)
205+
.headOption
206+
.flatMap(_.asJsObject.getFields("value").headOption))
207+
208+
averageDuration match {
209+
case Some(JsNull) =>
210+
DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long])
211+
212+
case Some(duration) =>
213+
DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long])
214+
215+
case _ => deserializationError("Cannot deserialize ProductItem: invalid input. Raw input: ")
216+
}
217+
218+
case (_, averageAggregations, Some(count)) if averageAggregations.nonEmpty =>
219+
val averageDuration = averageAggregations.headOption.flatMap(_.asJsObject.getFields("value").headOption)
220+
221+
averageDuration match {
222+
case Some(JsNull) =>
223+
DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long])
224+
225+
case Some(duration) =>
226+
DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long])
227+
228+
case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t")
229+
}
230+
231+
case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t")
232+
}
233+
234+
case other => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $other")
235+
}
236+
237+
}
238+
239+
// This method would not be used.
240+
override def write(obj: DurationCheckResult): JsValue = {
241+
JsArray(JsNumber(obj.averageDuration.get), JsNumber(obj.hitCount), JsNumber(obj.took))
242+
}
243+
}
244+
245+
case class DurationCheckerConfig(timeWindow: FiniteDuration)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.scheduler.queue
19+
20+
import akka.actor.ActorSystem
21+
import org.apache.openwhisk.common.Logging
22+
import org.apache.openwhisk.core.entity.WhiskActionMetaData
23+
24+
import scala.concurrent.Future
25+
26+
object NoopDurationCheckerProvider extends DurationCheckerProvider {
27+
override def instance(actorSystem: ActorSystem, log: Logging): NoopDurationChecker = {
28+
implicit val as: ActorSystem = actorSystem
29+
implicit val logging: Logging = log
30+
new NoopDurationChecker()
31+
}
32+
}
33+
34+
object NoopDurationChecker {
35+
implicit val serde = new ElasticSearchDurationCheckResultFormat()
36+
}
37+
38+
class NoopDurationChecker extends DurationChecker {
39+
import scala.concurrent.ExecutionContext.Implicits.global
40+
41+
override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
42+
callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = {
43+
Future {
44+
DurationCheckResult(Option.apply(0), 0, 0)
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)