Skip to content

Commit 3fb2fc5

Browse files
committed
- Added an execution context to the PoolingRestClient to be able to propagate it correctly from custom implementations
1 parent e89f5da commit 3fb2fc5

File tree

5 files changed

+13
-16
lines changed

5 files changed

+13
-16
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ protected class AkkaContainerClient(
7070
port: Int,
7171
timeout: FiniteDuration,
7272
queueSize: Int,
73-
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
74-
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
73+
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem, ec: ExecutionContext)
74+
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))(as, ec)
7575
with ContainerClient {
7676

7777
def close() = shutdown()

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class ElasticSearchLogStore(
7979
elasticSearchConfig.protocol,
8080
elasticSearchConfig.host,
8181
elasticSearchConfig.port,
82-
httpFlow)
82+
httpFlow)(system, system.dispatcher)
8383

8484
private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
8585
ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,16 @@
1717

1818
package org.apache.openwhisk.core.containerpool.logging
1919

20-
import scala.concurrent.Future
20+
import scala.concurrent.{ExecutionContext, Future, Promise}
2121
import scala.util.{Either, Try}
22-
2322
import akka.actor.ActorSystem
2423
import akka.http.scaladsl.model._
2524
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
2625
import akka.http.scaladsl.model.headers.Accept
2726
import akka.stream.scaladsl.Flow
2827

29-
import scala.concurrent.Promise
3028
import scala.util.Try
31-
3229
import spray.json._
33-
3430
import org.apache.openwhisk.http.PoolingRestClient
3531
import org.apache.openwhisk.http.PoolingRestClient._
3632

@@ -154,8 +150,9 @@ class ElasticSearchRestClient(
154150
host: String,
155151
port: Int,
156152
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
157-
implicit system: ActorSystem)
158-
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
153+
implicit system: ActorSystem,
154+
ec: ExecutionContext)
155+
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow)(system, ec) {
159156

160157
import ElasticSearchJsonProtocol._
161158

common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ import scala.concurrent.{ExecutionContext, Future}
4242
class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
4343
implicit system: ActorSystem,
4444
logging: Logging)
45-
extends PoolingRestClient(protocol, host, port, 16 * 1024) {
45+
extends PoolingRestClient(protocol, host, port, 16 * 1024)(
46+
system,
47+
system.dispatchers.lookup("dispatchers.couch-dispatcher")) {
4648

47-
protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
49+
protected implicit val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
4850

4951
// Headers common to all requests.
5052
protected val baseHeaders: List[HttpHeader] =

common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,11 @@ class PoolingRestClient(
4747
port: Int,
4848
queueSize: Int,
4949
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
50-
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
50+
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec: ExecutionContext) {
5151
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
5252

5353
private val logging = new AkkaLogging(system.log)
5454

55-
protected implicit val context: ExecutionContext = system.dispatcher
56-
5755
//if specified, override the ClientConnection idle-timeout and keepalive socket option value
5856
private val timeoutSettings = {
5957
val cps = ConnectionPoolSettings(system.settings.config)
@@ -137,7 +135,7 @@ class PoolingRestClient(
137135
def shutdown(): Future[Unit] = {
138136
killSwitch.shutdown()
139137
Try(requestQueue.complete()).recover {
140-
case t: IllegalStateException => logging.error(this, t.getMessage)
138+
case t: IllegalStateException => logging.warn(this, t.getMessage)
141139
}
142140
Future.unit
143141
}

0 commit comments

Comments
 (0)