Skip to content

Commit f9e469e

Browse files
[New Scheduler] Add container counter (#5072)
* [New Scheduler] Add container counter Get container count from ETCD when related data get updated in ETCD * Fix tests * Fix tests
1 parent e05aa44 commit f9e469e

File tree

2 files changed

+477
-0
lines changed

2 files changed

+477
-0
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.scheduler.queue
19+
20+
import java.util.concurrent.atomic.AtomicInteger
21+
22+
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
23+
import org.apache.openwhisk.common.Logging
24+
import org.apache.openwhisk.core.etcd.EtcdClient
25+
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
26+
import org.apache.openwhisk.core.service.{DeleteEvent, PutEvent, UnwatchEndpoint, WatchEndpoint, WatchEndpointOperation}
27+
28+
import scala.collection.concurrent.TrieMap
29+
import scala.concurrent.{ExecutionContext, Future}
30+
31+
class ContainerCounter(invocationNamespace: String, etcdClient: EtcdClient, watcherService: ActorRef)(
32+
implicit val actorSystem: ActorSystem,
33+
ec: ExecutionContext,
34+
logging: Logging) {
35+
private[queue] var existingContainerNumByNamespace: Int = 0
36+
private[queue] var inProgressContainerNumByNamespace: Int = 0
37+
private[queue] val references = new AtomicInteger(0)
38+
private val watcherName = s"container-counter-$invocationNamespace"
39+
40+
private val inProgressContainerPrefixKeyByNamespace =
41+
ContainerKeys.inProgressContainerPrefixByNamespace(invocationNamespace)
42+
private val existingContainerPrefixKeyByNamespace =
43+
ContainerKeys.existingContainersPrefixByNamespace(invocationNamespace)
44+
45+
private val watchedKeys = Seq(inProgressContainerPrefixKeyByNamespace, existingContainerPrefixKeyByNamespace)
46+
47+
private val watcher =
48+
actorSystem.actorOf(Props(new Actor {
49+
private var countingKeys = Set.empty[String]
50+
private var waitingForCountKeys = Set.empty[String]
51+
52+
override def receive: Receive = {
53+
case operation: WatchEndpointOperation if operation.isPrefix =>
54+
if (countingKeys
55+
.contains(operation.watchKey))
56+
waitingForCountKeys += operation.watchKey
57+
else {
58+
countingKeys += operation.watchKey
59+
refreshContainerCount(operation.watchKey)
60+
}
61+
62+
case ReadyToGetCount(key) =>
63+
if (waitingForCountKeys.contains(key)) {
64+
waitingForCountKeys -= key
65+
refreshContainerCount(key)
66+
} else
67+
countingKeys -= key
68+
}
69+
}))
70+
71+
private def refreshContainerCount(key: String): Future[Unit] = {
72+
etcdClient
73+
.getCount(key)
74+
.map { count =>
75+
key match {
76+
case `inProgressContainerPrefixKeyByNamespace` => inProgressContainerNumByNamespace = count.toInt
77+
case `existingContainerPrefixKeyByNamespace` => existingContainerNumByNamespace = count.toInt
78+
}
79+
watcher ! ReadyToGetCount(key)
80+
}
81+
.recover {
82+
case t: Throwable =>
83+
logging.error(
84+
this,
85+
s"failed to get the number of existing containers for ${invocationNamespace} due to ${t}.")
86+
watcher ! ReadyToGetCount(key)
87+
}
88+
}
89+
90+
def increaseReference(): ContainerCounter = {
91+
if (references.incrementAndGet() == 1) {
92+
watchedKeys.foreach { key =>
93+
watcherService.tell(WatchEndpoint(key, "", true, watcherName, Set(PutEvent, DeleteEvent)), watcher)
94+
}
95+
96+
}
97+
this
98+
}
99+
100+
def close(): Unit = {
101+
if (references.decrementAndGet() == 0) {
102+
watchedKeys.foreach { key =>
103+
watcherService ! UnwatchEndpoint(key, true, watcherName)
104+
}
105+
NamespaceContainerCount.instances.remove(invocationNamespace)
106+
}
107+
}
108+
}
109+
110+
object NamespaceContainerCount {
111+
private[queue] val instances = TrieMap[String, ContainerCounter]()
112+
def apply(namespace: String, etcdClient: EtcdClient, watcherService: ActorRef)(implicit actorSystem: ActorSystem,
113+
ec: ExecutionContext,
114+
logging: Logging): ContainerCounter = {
115+
instances
116+
.getOrElseUpdate(namespace, new ContainerCounter(namespace, etcdClient, watcherService))
117+
.increaseReference()
118+
}
119+
}
120+
121+
case class ReadyToGetCount(key: String)

0 commit comments

Comments
 (0)