Skip to content

Memory leak in akka.actor.LocalActorRef #5442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 29, 2023

Conversation

YevSent
Copy link
Contributor

@YevSent YevSent commented Sep 11, 2023

Description

Note: In our service, we use PoolingRestClient for communication between invokers and action pods and memory leak appears for long-running asynchronous invocations as it seems that objects allocated within streams are not clean properly after Akka materialization has been removed.

  • Replaced the usage of deprecated OverflowStrategy.dropNew with BoundedSourceQueueStage
  • Added proper clean-up of materialized resources to prevent memory leaks for long-running streams

Related issue and scope

My changes affect the following components

  • API
  • Controller
  • Message Bus (e.g., Kafka)
  • Loadbalancer
  • Scheduler
  • Invoker
  • Intrinsic actions (e.g., sequences, conductors)
  • Data stores (e.g., CouchDB)
  • Tests
  • Deployment
  • CLI
  • General tooling
  • Documentation

Types of changes

  • Bug fix (generally a non-breaking change which closes an issue).
  • Enhancement or new feature (adds new functionality).
  • Breaking change (a bug fix or enhancement which changes existing behavior).

Checklist:

  • I signed an Apache CLA.
  • I reviewed the style guides and followed the recommendations (Travis CI will check :).
  • I added tests to cover my changes.
  • My changes require further changes to the documentation.
  • I updated the documentation where necessary.

@dgrove-oss
Copy link
Member

I approved running the CI tests. Changes looked plausible to me, but I'm not an Akka expert.

def shutdown(): Future[Unit] = {
killSwitch.shutdown()
Try(requestQueue.complete()).recover {
case t: IllegalStateException => logging.error(this, t.getMessage)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exception will be thrown if a stream is already completed, but there is no API to check the stream state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the stream is already completed, I think this is a normal case.
Do we have to print an error log for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For our service, we decided to have a log record of this exception as we can track such a situation and see if the number of such exceptions gets worse, which might help to investigate other issues. Also, personally, I don't like the approach of silently ignoring exceptions.

In our case, if the exception propagated to the dependable code it will break the k8s containers clean up which will lead to thousands of abandoned containers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which exact case would it happen?
I thought it would always happen when we shut down the queue as we call shtudown of the kill switch.
So I thought changing the log level to info or warn would be enough for this.
But does this log stand for any abnormal situation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if Killswitch shuts it down all the time, in theory, it should, but I remember seeing an issue with memory leak only with having a kill switch and without queue.complete. By the number of these log messages and requests through the queue, it's hard to say if the kill switch does it all the time (maybe due to concurrent requests as well), but I saw a much smaller number of exceptions than requests that went through the queue.

I think it might also happen when some other place has already closed the queue.

Having just a warning message might be fine as well.

…ndedSourceQueueStage

 - Added proper clean-up of materialized resources to prevent memory leaks for long-running streams
Copy link
Member

@style95 style95 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joni-jones
Thank you for your contribution!

@@ -48,6 +50,8 @@ class PoolingRestClient(
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")

private val logging = new AkkaLogging(system.log)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to get the logging from the implicit parameter.
Most callers of this client have an implicit parameter for logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I was thinking about it, but it will require changing all implementations that extend or use PoolingRestClient and I think this is a backward incompatible change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.
The log store SPI does not pass an implicit parameter for logging.

@@ -72,16 +76,19 @@ class PoolingRestClient(
// Additional queue in case all connections are busy. Should hardly ever be
// filled in practice but can be useful, e.g., in tests starting many
// asynchronous requests in a very short period of time.
private val requestQueue = Source
.queue(queueSize, OverflowStrategy.dropNew)
Copy link
Member

@style95 style95 Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the overflow strategy is not specified, what happens now in case the queue is full?

Ok, now BoundedSourceQueueStage is used and it will immediately let us know if we can enqueue an element or not.

def shutdown(): Future[Unit] = {
killSwitch.shutdown()
Try(requestQueue.complete()).recover {
case t: IllegalStateException => logging.error(this, t.getMessage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the stream is already completed, I think this is a normal case.
Do we have to print an error log for this?

@YevSent
Copy link
Contributor Author

YevSent commented Sep 11, 2023

I'm looking into failed tests due to NullPointerException on sinkCompletion.onComplete step.

@YevSent
Copy link
Contributor Author

YevSent commented Sep 12, 2023

It looks like the NullPointerException appears due to CouchDbRestClient redefining the execution context.

…propagate it correctly from custom implementations
@YevSent
Copy link
Contributor Author

YevSent commented Sep 18, 2023

I changed the code to be able to propagate the right context.

extends PoolingRestClient(protocol, host, port, 16 * 1024) {
extends PoolingRestClient(protocol, host, port, 16 * 1024)(
system,
system.dispatchers.lookup("dispatchers.couch-dispatcher")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't understand why we need to pass this same execution context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future onComplete requires an execution context, but because onComplete is defined during class initialization it doesn't have a proper execution context in the case of CouchDbRestClient because the child class is initialized later than the parent.

I kept the protected property in CouchDbRestClient because its children depend on it, there is at least one implementation inside OW.

@codecov-commenter
Copy link

codecov-commenter commented Sep 18, 2023

Codecov Report

Merging #5442 (d084a4f) into master (20f7d98) will decrease coverage by 25.91%.
Report is 1 commits behind head on master.
The diff coverage is 75.00%.

❗ Current head d084a4f differs from pull request most recent head 3fb2fc5. Consider uploading reports for the commit 3fb2fc5 to get more accurate results

@@             Coverage Diff             @@
##           master    #5442       +/-   ##
===========================================
- Coverage   76.82%   50.91%   -25.91%     
===========================================
  Files         241      241               
  Lines       14632    14630        -2     
  Branches      616      606       -10     
===========================================
- Hits        11241     7449     -3792     
- Misses       3391     7181     +3790     
Files Changed Coverage Δ
...whisk/core/containerpool/AkkaContainerClient.scala 44.23% <ø> (-26.93%) ⬇️
.../containerpool/logging/ElasticSearchLogStore.scala 0.00% <0.00%> (-92.86%) ⬇️
...ontainerpool/logging/ElasticSearchRestClient.scala 8.88% <ø> (-68.89%) ⬇️
.../org/apache/openwhisk/http/PoolingRestClient.scala 87.09% <80.00%> (-3.82%) ⬇️
...he/openwhisk/core/database/CouchDbRestClient.scala 70.83% <100.00%> (-16.67%) ⬇️

... and 149 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Member

@style95 style95 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bdoyle0182
Copy link
Contributor

LGTM as well

@bdoyle0182 bdoyle0182 merged commit 6f11d48 into apache:master Sep 29, 2023
mtt-merz pushed a commit to mtt-merz/openwhisk that referenced this pull request Oct 22, 2023
* - Replaced the usage of deprecated OverflowStrategy.dropNew with BoundedSourceQueueStage
 - Added proper clean-up of materialized resources to prevent memory leaks for long-running streams

* - Added an execution context to the PoolingRestClient to be able to propagate it correctly from custom implementations

(cherry picked from commit 6f11d48)
@falkzoll
Copy link
Contributor

Hi @joni-jones , maybe you or someone else here in this PR can help us to understand an issue in the https://github.com/apache/openwhisk-runtime-nodejs runtime... actually the scheduled openwhisk nodejs runtime builds fail (https://github.com/apache/openwhisk-runtime-nodejs/actions). They fail with a timeout (after 30s) in a certain testcase that tests the concurrent invocation capability (number of concurrent invokes/runs for a single action container) of this runtime (for nodejs:18 and nodejs:20).

runtime.actionContainers.NodeJs18ConcurrentTests > action-nodejs-v18 should allow running activations concurrently FAILED
    java.util.concurrent.TimeoutException
        at org.apache.openwhisk.core.containerpool.AkkaContainerClient$.$anonfun$executeRequest$1(AkkaContainerClient.scala:252)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

Same for nodejs:20. This kind of concurrency is actually only supported in the nodejs runtimes. Other runtimes just support '__OW_ALLOW_CONCURRENT=false' and can only handle one invoke/run per action container at a time.

To run the tests, the github action of the nodejs runtime clones the latest available apache/openwhisk repository and uses its master as the base to run its tests (https://github.com/apache/openwhisk-runtime-nodejs/blob/master/.github/workflows/ci.yaml).

The actually failing testcase performs 128 (https://github.com/apache/openwhisk-runtime-nodejs/blob/c60a6676375d85878c658412162004848c19f965/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala#L32) parallel action invokes/run requests into a single nodejs runtime action container. The scala test utilizes the AkkaContainerClient to open these 128 parallel connections (https://github.com/apache/openwhisk-runtime-nodejs/blob/c60a6676375d85878c658412162004848c19f965/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala#L24).
The nodejs test action invoked inside the action container (https://github.com/apache/openwhisk-runtime-nodejs/blob/c60a6676375d85878c658412162004848c19f965/tests/src/test/scala/runtime/actionContainers/NodeJsConcurrentTests.scala#L41) is coded to wait for 128 incoming invokes before it starts to complete all of them with a response (makes use of global variables). Means the first action run request to this action is held open and not answered before the 128th run reached the action code in this container. With this the test can be sure that this runtime can handle this number of concurrent action invokes being open at the same time. This test usually takes far less than 5 seconds in the github action while the timeout for this test is 30s.

Debugging it locally showed that with the current implementation in this PR the test seems not to reach the required 128 parallel connections anymore. It seems after a set of open connections is reached (far less than 128), no others are done anymore or maybe they arrive very, very slow. With this, the pending invokes in the action container do not return a response in time and the testcase fails after 30s.
Reverting to the previous commit (0c27a65) resolves the issue and the concurrency test completes successful within the usual few seconds for nodejs:18 and nodejs:20.

Looking at the changes in this PR it does not look like a change is needed to adapt the nodejs runtime tests. Anyhow, with your broader akka background, do we need to modify something in the nodejs runtime tests to consume this latest apache/openwhisk core?
Any hints are welcome :-).

@style95
Copy link
Member

style95 commented Oct 25, 2023

@falkzoll
Indeed..
Could you really resolve the issue by reverting the previous commit (0c27a65)?
It's surprising because the userCPU config is disabled by default.

@style95
Copy link
Member

style95 commented Oct 25, 2023

I think we should get into the CI environment and see what's going on.
I think we can add an on-demand workflow along with Ngrok debugging.
https://github.com/apache/openwhisk/blob/master/.github/workflows/0-on-demand.yaml#L23

@falkzoll
Copy link
Contributor

@style95 thanks for your reply 👍 .... just to clarify... running the nodejs runtime tests locally with apache/openwhisk commit 6f11d48 (latest master right now) shows the error in the nodejs runtime concurrency tests.... going back one commit to commit 0c27a65 (which includes the CPU limits) works fine (no failures in these tests)... from that, it looks like the change was introduced with commit 6f11d48 and is not related to the changes for the CPU limits.

@style95
Copy link
Member

style95 commented Oct 27, 2023

@falkzoll got it.
I would take a look.

@style95
Copy link
Member

style95 commented Oct 28, 2023

@falkzoll
I created an issue: #5452
It seems the underlying request queue is shutdown before sending all 128 concurrent requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants