Skip to content

Memory leak in akka.actor.LocalActorRef #5442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ protected class AkkaContainerClient(
port: Int,
timeout: FiniteDuration,
queueSize: Int,
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem, ec: ExecutionContext)
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))(as, ec)
with ContainerClient {

def close() = shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ElasticSearchLogStore(
elasticSearchConfig.protocol,
elasticSearchConfig.host,
elasticSearchConfig.port,
httpFlow)
httpFlow)(system, system.dispatcher)

private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.openwhisk.core.containerpool.logging

import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Either, Try}

import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
import akka.http.scaladsl.model.headers.Accept
import akka.stream.scaladsl.Flow

import scala.concurrent.Promise
import scala.util.Try

import spray.json._

import org.apache.openwhisk.http.PoolingRestClient
import org.apache.openwhisk.http.PoolingRestClient._

Expand Down Expand Up @@ -154,8 +150,9 @@ class ElasticSearchRestClient(
host: String,
port: Int,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
implicit system: ActorSystem)
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
implicit system: ActorSystem,
ec: ExecutionContext)
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow)(system, ec) {

import ElasticSearchJsonProtocol._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import scala.concurrent.{ExecutionContext, Future}
class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
implicit system: ActorSystem,
logging: Logging)
extends PoolingRestClient(protocol, host, port, 16 * 1024) {
extends PoolingRestClient(protocol, host, port, 16 * 1024)(
system,
system.dispatchers.lookup("dispatchers.couch-dispatcher")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't understand why we need to pass this same execution context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future onComplete requires an execution context, but because onComplete is defined during class initialization it doesn't have a proper execution context in the case of CouchDbRestClient because the child class is initialized later than the parent.

I kept the protected property in CouchDbRestClient because its children depend on it, there is at least one implementation inside OW.


protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
protected implicit val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")

// Headers common to all requests.
protected val baseHeaders: List[HttpHeader] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.unmarshalling._
import akka.stream.{OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, _}
import akka.stream.{KillSwitches, QueueOfferResult}
import org.apache.openwhisk.common.AkkaLogging
import spray.json._
import scala.concurrent.{ExecutionContext, Future, Promise}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}

/**
Expand All @@ -45,10 +47,10 @@ class PoolingRestClient(
port: Int,
queueSize: Int,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec: ExecutionContext) {
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")

protected implicit val context: ExecutionContext = system.dispatcher
private val logging = new AkkaLogging(system.log)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to get the logging from the implicit parameter.
Most callers of this client have an implicit parameter for logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I was thinking about it, but it will require changing all implementations that extend or use PoolingRestClient and I think this is a backward incompatible change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.
The log store SPI does not pass an implicit parameter for logging.


//if specified, override the ClientConnection idle-timeout and keepalive socket option value
private val timeoutSettings = {
Expand All @@ -72,16 +74,19 @@ class PoolingRestClient(
// Additional queue in case all connections are busy. Should hardly ever be
// filled in practice but can be useful, e.g., in tests starting many
// asynchronous requests in a very short period of time.
private val requestQueue = Source
.queue(queueSize, OverflowStrategy.dropNew)
Copy link
Member

@style95 style95 Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the overflow strategy is not specified, what happens now in case the queue is full?

Ok, now BoundedSourceQueueStage is used and it will immediately let us know if we can enqueue an element or not.

private val ((requestQueue, killSwitch), sinkCompletion) = Source
.queue(queueSize)
.via(httpFlow.getOrElse(pool))
.viaMat(KillSwitches.single)(Keep.both)
.toMat(Sink.foreach({
case (Success(response), p) =>
p.success(response)
case (Failure(error), p) =>
p.failure(error)
}))(Keep.left)
.run
}))(Keep.both)
.run()

sinkCompletion.onComplete(_ => shutdown())

/**
* Execute an HttpRequest on the underlying connection pool.
Expand All @@ -96,10 +101,10 @@ class PoolingRestClient(

// When the future completes, we know whether the request made it
// through the queue.
requestQueue.offer(request -> promise).flatMap {
requestQueue.offer(request -> promise) match {
case QueueOfferResult.Enqueued => promise.future
case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full."))
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed."))
case QueueOfferResult.Dropped => Future.failed(new Exception("Request queue is full."))
case QueueOfferResult.QueueClosed => Future.failed(new Exception("Request queue was closed."))
case QueueOfferResult.Failure(f) => Future.failed(f)
}
}
Expand Down Expand Up @@ -127,7 +132,13 @@ class PoolingRestClient(
}
}

def shutdown(): Future[Unit] = Future.unit
def shutdown(): Future[Unit] = {
killSwitch.shutdown()
Try(requestQueue.complete()).recover {
case t: IllegalStateException => logging.warn(this, t.getMessage)
}
Future.unit
}
}

object PoolingRestClient {
Expand Down