-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Memory leak in akka.actor.LocalActorRef
#5442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._ | |
import akka.http.scaladsl.model._ | ||
import akka.http.scaladsl.settings.ConnectionPoolSettings | ||
import akka.http.scaladsl.unmarshalling._ | ||
import akka.stream.{OverflowStrategy, QueueOfferResult} | ||
import akka.stream.scaladsl.{Flow, _} | ||
import akka.stream.{KillSwitches, QueueOfferResult} | ||
import org.apache.openwhisk.common.AkkaLogging | ||
import spray.json._ | ||
import scala.concurrent.{ExecutionContext, Future, Promise} | ||
|
||
import scala.concurrent.duration._ | ||
import scala.concurrent.{ExecutionContext, Future, Promise} | ||
import scala.util.{Failure, Success, Try} | ||
|
||
/** | ||
|
@@ -45,10 +47,10 @@ class PoolingRestClient( | |
port: Int, | ||
queueSize: Int, | ||
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None, | ||
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) { | ||
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec: ExecutionContext) { | ||
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") | ||
|
||
protected implicit val context: ExecutionContext = system.dispatcher | ||
private val logging = new AkkaLogging(system.log) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to get the logging from the implicit parameter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initially, I was thinking about it, but it will require changing all implementations that extend or use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. |
||
|
||
//if specified, override the ClientConnection idle-timeout and keepalive socket option value | ||
private val timeoutSettings = { | ||
|
@@ -72,16 +74,19 @@ class PoolingRestClient( | |
// Additional queue in case all connections are busy. Should hardly ever be | ||
// filled in practice but can be useful, e.g., in tests starting many | ||
// asynchronous requests in a very short period of time. | ||
private val requestQueue = Source | ||
.queue(queueSize, OverflowStrategy.dropNew) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ok, now BoundedSourceQueueStage is used and it will immediately let us know if we can enqueue an element or not. |
||
private val ((requestQueue, killSwitch), sinkCompletion) = Source | ||
.queue(queueSize) | ||
.via(httpFlow.getOrElse(pool)) | ||
.viaMat(KillSwitches.single)(Keep.both) | ||
.toMat(Sink.foreach({ | ||
case (Success(response), p) => | ||
p.success(response) | ||
case (Failure(error), p) => | ||
p.failure(error) | ||
}))(Keep.left) | ||
.run | ||
}))(Keep.both) | ||
.run() | ||
|
||
sinkCompletion.onComplete(_ => shutdown()) | ||
|
||
/** | ||
* Execute an HttpRequest on the underlying connection pool. | ||
|
@@ -96,10 +101,10 @@ class PoolingRestClient( | |
|
||
// When the future completes, we know whether the request made it | ||
// through the queue. | ||
requestQueue.offer(request -> promise).flatMap { | ||
requestQueue.offer(request -> promise) match { | ||
case QueueOfferResult.Enqueued => promise.future | ||
case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full.")) | ||
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) | ||
case QueueOfferResult.Dropped => Future.failed(new Exception("Request queue is full.")) | ||
case QueueOfferResult.QueueClosed => Future.failed(new Exception("Request queue was closed.")) | ||
case QueueOfferResult.Failure(f) => Future.failed(f) | ||
} | ||
} | ||
|
@@ -127,7 +132,13 @@ class PoolingRestClient( | |
} | ||
} | ||
|
||
def shutdown(): Future[Unit] = Future.unit | ||
def shutdown(): Future[Unit] = { | ||
killSwitch.shutdown() | ||
Try(requestQueue.complete()).recover { | ||
case t: IllegalStateException => logging.warn(this, t.getMessage) | ||
} | ||
Future.unit | ||
} | ||
} | ||
|
||
object PoolingRestClient { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't understand why we need to pass this same execution context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future
onComplete
requires an execution context, but becauseonComplete
is defined during class initialization it doesn't have a proper execution context in the case ofCouchDbRestClient
because the child class is initialized later than the parent.I kept the protected property in
CouchDbRestClient
because its children depend on it, there is at least one implementation inside OW.