-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Memory leak in akka.actor.LocalActorRef
#5442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I approved running the CI tests. Changes looked plausible to me, but I'm not an Akka expert. |
def shutdown(): Future[Unit] = { | ||
killSwitch.shutdown() | ||
Try(requestQueue.complete()).recover { | ||
case t: IllegalStateException => logging.error(this, t.getMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An exception will be thrown if a stream is already completed, but there is no API to check the stream state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the stream is already completed, I think this is a normal case.
Do we have to print an error log for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For our service, we decided to have a log record of this exception as we can track such a situation and see if the number of such exceptions gets worse, which might help to investigate other issues. Also, personally, I don't like the approach of silently ignoring exceptions.
In our case, if the exception propagated to the dependable code it will break the k8s containers clean up which will lead to thousands of abandoned containers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which exact case would it happen?
I thought it would always happen when we shut down the queue as we call shtudown
of the kill switch.
So I thought changing the log level to info or warn would be enough for this.
But does this log stand for any abnormal situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if Killswitch
shuts it down all the time, in theory, it should, but I remember seeing an issue with memory leak only with having a kill switch and without queue.complete
. By the number of these log messages and requests through the queue, it's hard to say if the kill switch does it all the time (maybe due to concurrent requests as well), but I saw a much smaller number of exceptions than requests that went through the queue.
I think it might also happen when some other place has already closed the queue.
Having just a warning message might be fine as well.
…ndedSourceQueueStage - Added proper clean-up of materialized resources to prevent memory leaks for long-running streams
79462bf
to
e89f5da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joni-jones
Thank you for your contribution!
@@ -48,6 +50,8 @@ class PoolingRestClient( | |||
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) { | |||
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") | |||
|
|||
private val logging = new AkkaLogging(system.log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to get the logging from the implicit parameter.
Most callers of this client have an implicit parameter for logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I was thinking about it, but it will require changing all implementations that extend or use PoolingRestClient
and I think this is a backward incompatible change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
The log store SPI does not pass an implicit parameter for logging.
@@ -72,16 +76,19 @@ class PoolingRestClient( | |||
// Additional queue in case all connections are busy. Should hardly ever be | |||
// filled in practice but can be useful, e.g., in tests starting many | |||
// asynchronous requests in a very short period of time. | |||
private val requestQueue = Source | |||
.queue(queueSize, OverflowStrategy.dropNew) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the overflow strategy is not specified, what happens now in case the queue is full?
Ok, now BoundedSourceQueueStage is used and it will immediately let us know if we can enqueue an element or not.
def shutdown(): Future[Unit] = { | ||
killSwitch.shutdown() | ||
Try(requestQueue.complete()).recover { | ||
case t: IllegalStateException => logging.error(this, t.getMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the stream is already completed, I think this is a normal case.
Do we have to print an error log for this?
I'm looking into failed tests due to |
It looks like the |
…propagate it correctly from custom implementations
I changed the code to be able to propagate the right context. |
extends PoolingRestClient(protocol, host, port, 16 * 1024) { | ||
extends PoolingRestClient(protocol, host, port, 16 * 1024)( | ||
system, | ||
system.dispatchers.lookup("dispatchers.couch-dispatcher")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't understand why we need to pass this same execution context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future onComplete
requires an execution context, but because onComplete
is defined during class initialization it doesn't have a proper execution context in the case of CouchDbRestClient
because the child class is initialized later than the parent.
I kept the protected property in CouchDbRestClient
because its children depend on it, there is at least one implementation inside OW.
Codecov Report
@@ Coverage Diff @@
## master #5442 +/- ##
===========================================
- Coverage 76.82% 50.91% -25.91%
===========================================
Files 241 241
Lines 14632 14630 -2
Branches 616 606 -10
===========================================
- Hits 11241 7449 -3792
- Misses 3391 7181 +3790
... and 149 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM as well |
* - Replaced the usage of deprecated OverflowStrategy.dropNew with BoundedSourceQueueStage - Added proper clean-up of materialized resources to prevent memory leaks for long-running streams * - Added an execution context to the PoolingRestClient to be able to propagate it correctly from custom implementations (cherry picked from commit 6f11d48)
Hi @joni-jones , maybe you or someone else here in this PR can help us to understand an issue in the https://github.com/apache/openwhisk-runtime-nodejs runtime... actually the scheduled openwhisk nodejs runtime builds fail (https://github.com/apache/openwhisk-runtime-nodejs/actions). They fail with a timeout (after 30s) in a certain testcase that tests the concurrent invocation capability (number of concurrent invokes/runs for a single action container) of this runtime (for nodejs:18 and nodejs:20).
Same for nodejs:20. This kind of concurrency is actually only supported in the nodejs runtimes. Other runtimes just support '__OW_ALLOW_CONCURRENT=false' and can only handle one invoke/run per action container at a time. To run the tests, the github action of the nodejs runtime clones the latest available apache/openwhisk repository and uses its master as the base to run its tests (https://github.com/apache/openwhisk-runtime-nodejs/blob/master/.github/workflows/ci.yaml). The actually failing testcase performs 128 (https://github.com/apache/openwhisk-runtime-nodejs/blob/c60a6676375d85878c658412162004848c19f965/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala#L32) parallel action invokes/run requests into a single nodejs runtime action container. The scala test utilizes the AkkaContainerClient to open these 128 parallel connections (https://github.com/apache/openwhisk-runtime-nodejs/blob/c60a6676375d85878c658412162004848c19f965/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala#L24). Debugging it locally showed that with the current implementation in this PR the test seems not to reach the required 128 parallel connections anymore. It seems after a set of open connections is reached (far less than 128), no others are done anymore or maybe they arrive very, very slow. With this, the pending invokes in the action container do not return a response in time and the testcase fails after 30s. Looking at the changes in this PR it does not look like a change is needed to adapt the nodejs runtime tests. Anyhow, with your broader akka background, do we need to modify something in the nodejs runtime tests to consume this latest apache/openwhisk core? |
I think we should get into the CI environment and see what's going on. |
@style95 thanks for your reply 👍 .... just to clarify... running the nodejs runtime tests locally with apache/openwhisk commit 6f11d48 (latest master right now) shows the error in the nodejs runtime concurrency tests.... going back one commit to commit 0c27a65 (which includes the CPU limits) works fine (no failures in these tests)... from that, it looks like the change was introduced with commit 6f11d48 and is not related to the changes for the CPU limits. |
@falkzoll got it. |
Description
Related issue and scope
My changes affect the following components
Types of changes
Checklist: