Skip to content

Commit d084a4f

Browse files
author
Yevhen Sentiabov
authored
Merge 3fb2fc5 into 20f7d98
2 parents 20f7d98 + 3fb2fc5 commit d084a4f

File tree

5 files changed

+34
-24
lines changed

5 files changed

+34
-24
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ protected class AkkaContainerClient(
7070
port: Int,
7171
timeout: FiniteDuration,
7272
queueSize: Int,
73-
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
74-
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
73+
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem, ec: ExecutionContext)
74+
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))(as, ec)
7575
with ContainerClient {
7676

7777
def close() = shutdown()

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class ElasticSearchLogStore(
7979
elasticSearchConfig.protocol,
8080
elasticSearchConfig.host,
8181
elasticSearchConfig.port,
82-
httpFlow)
82+
httpFlow)(system, system.dispatcher)
8383

8484
private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
8585
ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,16 @@
1717

1818
package org.apache.openwhisk.core.containerpool.logging
1919

20-
import scala.concurrent.Future
20+
import scala.concurrent.{ExecutionContext, Future, Promise}
2121
import scala.util.{Either, Try}
22-
2322
import akka.actor.ActorSystem
2423
import akka.http.scaladsl.model._
2524
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
2625
import akka.http.scaladsl.model.headers.Accept
2726
import akka.stream.scaladsl.Flow
2827

29-
import scala.concurrent.Promise
3028
import scala.util.Try
31-
3229
import spray.json._
33-
3430
import org.apache.openwhisk.http.PoolingRestClient
3531
import org.apache.openwhisk.http.PoolingRestClient._
3632

@@ -154,8 +150,9 @@ class ElasticSearchRestClient(
154150
host: String,
155151
port: Int,
156152
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
157-
implicit system: ActorSystem)
158-
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
153+
implicit system: ActorSystem,
154+
ec: ExecutionContext)
155+
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow)(system, ec) {
159156

160157
import ElasticSearchJsonProtocol._
161158

common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ import scala.concurrent.{ExecutionContext, Future}
4242
class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
4343
implicit system: ActorSystem,
4444
logging: Logging)
45-
extends PoolingRestClient(protocol, host, port, 16 * 1024) {
45+
extends PoolingRestClient(protocol, host, port, 16 * 1024)(
46+
system,
47+
system.dispatchers.lookup("dispatchers.couch-dispatcher")) {
4648

47-
protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
49+
protected implicit val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
4850

4951
// Headers common to all requests.
5052
protected val baseHeaders: List[HttpHeader] =

common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._
2424
import akka.http.scaladsl.model._
2525
import akka.http.scaladsl.settings.ConnectionPoolSettings
2626
import akka.http.scaladsl.unmarshalling._
27-
import akka.stream.{OverflowStrategy, QueueOfferResult}
2827
import akka.stream.scaladsl.{Flow, _}
28+
import akka.stream.{KillSwitches, QueueOfferResult}
29+
import org.apache.openwhisk.common.AkkaLogging
2930
import spray.json._
30-
import scala.concurrent.{ExecutionContext, Future, Promise}
31+
3132
import scala.concurrent.duration._
33+
import scala.concurrent.{ExecutionContext, Future, Promise}
3234
import scala.util.{Failure, Success, Try}
3335

3436
/**
@@ -45,10 +47,10 @@ class PoolingRestClient(
4547
port: Int,
4648
queueSize: Int,
4749
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
48-
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
50+
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec: ExecutionContext) {
4951
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
5052

51-
protected implicit val context: ExecutionContext = system.dispatcher
53+
private val logging = new AkkaLogging(system.log)
5254

5355
//if specified, override the ClientConnection idle-timeout and keepalive socket option value
5456
private val timeoutSettings = {
@@ -72,16 +74,19 @@ class PoolingRestClient(
7274
// Additional queue in case all connections are busy. Should hardly ever be
7375
// filled in practice but can be useful, e.g., in tests starting many
7476
// asynchronous requests in a very short period of time.
75-
private val requestQueue = Source
76-
.queue(queueSize, OverflowStrategy.dropNew)
77+
private val ((requestQueue, killSwitch), sinkCompletion) = Source
78+
.queue(queueSize)
7779
.via(httpFlow.getOrElse(pool))
80+
.viaMat(KillSwitches.single)(Keep.both)
7881
.toMat(Sink.foreach({
7982
case (Success(response), p) =>
8083
p.success(response)
8184
case (Failure(error), p) =>
8285
p.failure(error)
83-
}))(Keep.left)
84-
.run
86+
}))(Keep.both)
87+
.run()
88+
89+
sinkCompletion.onComplete(_ => shutdown())
8590

8691
/**
8792
* Execute an HttpRequest on the underlying connection pool.
@@ -96,10 +101,10 @@ class PoolingRestClient(
96101

97102
// When the future completes, we know whether the request made it
98103
// through the queue.
99-
requestQueue.offer(request -> promise).flatMap {
104+
requestQueue.offer(request -> promise) match {
100105
case QueueOfferResult.Enqueued => promise.future
101-
case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full."))
102-
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed."))
106+
case QueueOfferResult.Dropped => Future.failed(new Exception("Request queue is full."))
107+
case QueueOfferResult.QueueClosed => Future.failed(new Exception("Request queue was closed."))
103108
case QueueOfferResult.Failure(f) => Future.failed(f)
104109
}
105110
}
@@ -127,7 +132,13 @@ class PoolingRestClient(
127132
}
128133
}
129134

130-
def shutdown(): Future[Unit] = Future.unit
135+
def shutdown(): Future[Unit] = {
136+
killSwitch.shutdown()
137+
Try(requestQueue.complete()).recover {
138+
case t: IllegalStateException => logging.warn(this, t.getMessage)
139+
}
140+
Future.unit
141+
}
131142
}
132143

133144
object PoolingRestClient {

0 commit comments

Comments
 (0)