Skip to content

Commit 79462bf

Browse files
committed
- Replaced the usage of deprecated OverflowStrategy.dropNew with BoundedSourceQueueStage
- Added proper clean up of materialized resources to prevent memory leaks for long-running streams
1 parent 20f7d98 commit 79462bf

File tree

1 file changed

+31
-18
lines changed

1 file changed

+31
-18
lines changed

common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._
2424
import akka.http.scaladsl.model._
2525
import akka.http.scaladsl.settings.ConnectionPoolSettings
2626
import akka.http.scaladsl.unmarshalling._
27-
import akka.stream.{OverflowStrategy, QueueOfferResult}
2827
import akka.stream.scaladsl.{Flow, _}
28+
import akka.stream.{KillSwitches, QueueOfferResult}
29+
import org.apache.openwhisk.common.AkkaLogging
2930
import spray.json._
30-
import scala.concurrent.{ExecutionContext, Future, Promise}
31+
3132
import scala.concurrent.duration._
33+
import scala.concurrent.{ExecutionContext, Future, Promise}
3234
import scala.util.{Failure, Success, Try}
3335

3436
/**
@@ -40,14 +42,16 @@ import scala.util.{Failure, Success, Try}
4042
* extra queueing mechanism.
4143
*/
4244
class PoolingRestClient(
43-
protocol: String,
44-
host: String,
45-
port: Int,
46-
queueSize: Int,
47-
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
48-
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
45+
protocol: String,
46+
host: String,
47+
port: Int,
48+
queueSize: Int,
49+
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
50+
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
4951
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
5052

53+
private val logging = new AkkaLogging(system.log)
54+
5155
protected implicit val context: ExecutionContext = system.dispatcher
5256

5357
//if specified, override the ClientConnection idle-timeout and keepalive socket option value
@@ -72,16 +76,19 @@ class PoolingRestClient(
7276
// Additional queue in case all connections are busy. Should hardly ever be
7377
// filled in practice but can be useful, e.g., in tests starting many
7478
// asynchronous requests in a very short period of time.
75-
private val requestQueue = Source
76-
.queue(queueSize, OverflowStrategy.dropNew)
79+
private val ((requestQueue, killSwitch), sinkCompletion) = Source
80+
.queue(queueSize)
7781
.via(httpFlow.getOrElse(pool))
82+
.viaMat(KillSwitches.single)(Keep.both)
7883
.toMat(Sink.foreach({
7984
case (Success(response), p) =>
8085
p.success(response)
8186
case (Failure(error), p) =>
8287
p.failure(error)
83-
}))(Keep.left)
84-
.run
88+
}))(Keep.both)
89+
.run()
90+
91+
sinkCompletion.onComplete(_ => shutdown())
8592

8693
/**
8794
* Execute an HttpRequest on the underlying connection pool.
@@ -96,11 +103,11 @@ class PoolingRestClient(
96103

97104
// When the future completes, we know whether the request made it
98105
// through the queue.
99-
requestQueue.offer(request -> promise).flatMap {
100-
case QueueOfferResult.Enqueued => promise.future
101-
case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full."))
102-
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed."))
103-
case QueueOfferResult.Failure(f) => Future.failed(f)
106+
requestQueue.offer(request -> promise) match {
107+
case QueueOfferResult.Enqueued => promise.future
108+
case QueueOfferResult.Dropped => Future.failed(new Exception("Request queue is full."))
109+
case QueueOfferResult.QueueClosed => Future.failed(new Exception("Request queue was closed."))
110+
case QueueOfferResult.Failure(f) => Future.failed(f)
104111
}
105112
}
106113

@@ -127,7 +134,13 @@ class PoolingRestClient(
127134
}
128135
}
129136

130-
def shutdown(): Future[Unit] = Future.unit
137+
def shutdown(): Future[Unit] = {
138+
killSwitch.shutdown()
139+
Try(requestQueue.complete()).recover {
140+
case t: IllegalStateException => logging.error(this, t.getMessage)
141+
}
142+
Future.unit
143+
}
131144
}
132145

133146
object PoolingRestClient {

0 commit comments

Comments
 (0)