Skip to content

Commit 50c585d

Browse files
authored
Fix memory leak caused by QueryThrottler (#518)
* Fix memory leak caused by QueryThrottler * Update code comment documentation for future references.
1 parent 461a2c5 commit 50c585d

File tree

3 files changed

+242
-28
lines changed

3 files changed

+242
-28
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="QueryThrottlerSpecs.cs" company="Akka.NET Project">
3+
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
4+
// </copyright>
5+
// -----------------------------------------------------------------------
6+
7+
using System.Linq;
8+
using System.Threading.Tasks;
9+
using Akka.Actor;
10+
using Akka.Persistence.Sql.Query;
11+
using FluentAssertions;
12+
using FluentAssertions.Extensions;
13+
using Xunit;
14+
using Xunit.Abstractions;
15+
16+
namespace Akka.Persistence.Sql.Tests.Query;
17+
18+
public class QueryThrottlerSpecs: Akka.TestKit.Xunit2.TestKit
19+
{
20+
public QueryThrottlerSpecs(ITestOutputHelper output) : base("{}", output)
21+
{
22+
}
23+
24+
// Issue https://github.com/akkadotnet/Akka.Persistence.Sql/issues/516
25+
// Caused by Ask temp actor being watched and rooted in memory because it is not
26+
// compatible with death watch
27+
[Fact(DisplayName = "QueryThrottler should not watch permit request made using Ask")]
28+
public async Task MemoryLeakTest()
29+
{
30+
var throttler = Sys.ActorOf(Props.Create(() => new QueryThrottler(10)));
31+
32+
foreach (var _ in Enumerable.Range(1, 10))
33+
{
34+
// IMPORTANT: the bug was caused by Ask, don't replace this with Tell and ExpectMsg
35+
await throttler.Ask<QueryStartGranted>(new RequestQueryStart(3.Seconds()));
36+
throttler.Tell(ReturnQueryStart.Instance);
37+
}
38+
39+
throttler.Tell(GetWatchCount.Instance);
40+
ExpectMsg<long>().Should().Be(0);
41+
}
42+
43+
[Fact(DisplayName = "QueryThrottler should discard expired pending requests")]
44+
public async Task DiscardExpiredTest()
45+
{
46+
var throttler = Sys.ActorOf(Props.Create(() => new QueryThrottler(1)));
47+
48+
// Use all permits
49+
await throttler.Ask<QueryStartGranted>(new RequestQueryStart(3.Seconds()));
50+
51+
// Request for more permits
52+
throttler.Tell(new RequestQueryStart(100.Milliseconds()), TestActor);
53+
throttler.Tell(new RequestQueryStart(100.Milliseconds()), TestActor);
54+
55+
// Check that pending still contain pending requests
56+
throttler.Tell(GetPendingRequests.Instance, TestActor);
57+
var requests = await ExpectMsgAsync<PendingRequest[]>();
58+
requests.Length.Should().Be(2);
59+
60+
// No permit should be granted, all extra request timed out
61+
await ExpectNoMsgAsync(200.Milliseconds());
62+
63+
// Return the first permit
64+
throttler.Tell(ReturnQueryStart.Instance, TestActor);
65+
66+
// No permit should not be granted
67+
await ExpectNoMsgAsync(200.Milliseconds());
68+
69+
// Check that all timed out pending requests are discarded
70+
throttler.Tell(GetPendingRequests.Instance, TestActor);
71+
requests = await ExpectMsgAsync<PendingRequest[]>();
72+
requests.Length.Should().Be(0);
73+
}
74+
75+
[Fact(DisplayName = "QueryThrottler should honor pending requests")]
76+
public async Task PendingRequestsTest()
77+
{
78+
var throttler = Sys.ActorOf(Props.Create(() => new QueryThrottler(1)));
79+
80+
// Use all permits
81+
await throttler.Ask<QueryStartGranted>(new RequestQueryStart(3.Seconds()));
82+
83+
// Request for more permits
84+
throttler.Tell(new RequestQueryStart(3.Seconds()), TestActor);
85+
throttler.Tell(new RequestQueryStart(3.Seconds()), TestActor);
86+
87+
// Check that pending contain the last requests
88+
throttler.Tell(GetPendingRequests.Instance, TestActor);
89+
var requests = await ExpectMsgAsync<PendingRequest[]>();
90+
requests.Length.Should().Be(2);
91+
92+
// No permit should be granted
93+
await ExpectNoMsgAsync(200.Milliseconds());
94+
95+
// Return the first permit
96+
throttler.Tell(ReturnQueryStart.Instance, TestActor);
97+
98+
// First pending response
99+
ExpectMsg<QueryStartGranted>();
100+
101+
// Check that pending still contain pending requests
102+
throttler.Tell(GetPendingRequests.Instance, TestActor);
103+
requests = await ExpectMsgAsync<PendingRequest[]>();
104+
requests.Length.Should().Be(1);
105+
106+
// No permit should be granted
107+
await ExpectNoMsgAsync(200.Milliseconds());
108+
109+
// Return the second permit
110+
throttler.Tell(ReturnQueryStart.Instance, TestActor);
111+
112+
// Second pending response
113+
ExpectMsg<QueryStartGranted>();
114+
115+
// Check that pending is empty
116+
throttler.Tell(GetPendingRequests.Instance, TestActor);
117+
requests = await ExpectMsgAsync<PendingRequest[]>();
118+
requests.Length.Should().Be(0);
119+
120+
// Return the third permit
121+
throttler.Tell(ReturnQueryStart.Instance, TestActor);
122+
123+
// No permit should be granted (pending empty)
124+
await ExpectNoMsgAsync(200.Milliseconds());
125+
}
126+
}

src/Akka.Persistence.Sql/Extensions/ConnectionFactoryExtensions.cs

+2-12
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,9 @@ internal static async Task<T> ExecuteQueryWithTransactionAsync<T>(
5252
DbStateHolder state,
5353
Func<AkkaDataConnection, CancellationToken, Task<T>> handler)
5454
{
55-
using var cts = CancellationTokenSource.CreateLinkedTokenSource(state.ShutdownToken);
56-
{
57-
cts.CancelAfter(state.QueryThrottleTimeout);
58-
await state.QueryPermitter.Ask<QueryStartGranted>(RequestQueryStart.Instance, cts.Token);
59-
}
60-
6155
try
6256
{
57+
await state.QueryPermitter.Ask<QueryStartGranted>(new RequestQueryStart(state.QueryThrottleTimeout), state.QueryThrottleTimeout);
6358
return await factory.ExecuteWithTransactionAsync(state.IsolationLevel, state.ShutdownToken, handler);
6459
}
6560
finally
@@ -103,14 +98,9 @@ internal static async Task<T> ExecuteQueryWithTransactionAsync<TState,T>(
10398
TState state,
10499
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler)
105100
{
106-
using var cts = CancellationTokenSource.CreateLinkedTokenSource(factory.ShutdownToken);
107-
{
108-
cts.CancelAfter(factory.QueryThrottleTimeout);
109-
await factory.QueryPermitter.Ask<QueryStartGranted>(RequestQueryStart.Instance, cts.Token);
110-
}
111-
112101
try
113102
{
103+
await factory.QueryPermitter.Ask<QueryStartGranted>(new RequestQueryStart(factory.QueryThrottleTimeout), factory.QueryThrottleTimeout);
114104
return await factory.ConnectionFactory.ExecuteWithTransactionAsync(state, factory.IsolationLevel, factory.ShutdownToken, handler);
115105
}
116106
finally

src/Akka.Persistence.Sql/Query/QueryThrottler.cs

+114-16
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System;
99
using System.Collections.Generic;
10+
using System.Linq;
1011
using Akka.Actor;
1112
using Akka.Event;
1213
using Akka.Pattern;
@@ -18,8 +19,26 @@ namespace Akka.Persistence.Sql.Query;
1819
/// </summary>
1920
internal sealed class RequestQueryStart
2021
{
21-
public static readonly RequestQueryStart Instance = new();
22-
private RequestQueryStart() { }
22+
public DateTime DeadlineTime { get; }
23+
24+
public RequestQueryStart(TimeSpan timeout)
25+
{
26+
DeadlineTime = DateTime.UtcNow.Add(timeout);
27+
}
28+
}
29+
30+
internal sealed class PendingRequest
31+
{
32+
public PendingRequest(IActorRef requester, DateTime deadlineTime)
33+
{
34+
Requester = requester;
35+
DeadlineTime = deadlineTime;
36+
}
37+
38+
public IActorRef Requester { get; }
39+
public DateTime DeadlineTime { get; }
40+
41+
public bool IsExpired => DeadlineTime < DateTime.UtcNow;
2342
}
2443

2544
/// <summary>
@@ -40,31 +59,75 @@ internal sealed class ReturnQueryStart
4059
private ReturnQueryStart() { }
4160
}
4261

62+
#region Test classes
63+
64+
/// <summary>
65+
/// For testing purposes
66+
/// </summary>
67+
internal sealed class GetUsedPermits
68+
{
69+
public static readonly GetUsedPermits Instance = new();
70+
private GetUsedPermits() { }
71+
}
72+
73+
/// <summary>
74+
/// For testing purposes
75+
/// </summary>
76+
internal sealed class GetPendingRequests
77+
{
78+
public static readonly GetPendingRequests Instance = new();
79+
private GetPendingRequests() { }
80+
}
81+
82+
internal sealed class GetWatchCount
83+
{
84+
public static readonly GetWatchCount Instance = new();
85+
private GetWatchCount() { }
86+
}
87+
88+
#endregion
89+
4390
/// <summary>
4491
/// Token bucket throttler that grants queries permissions to run each iteration
4592
/// </summary>
4693
/// <remarks>
47-
/// Works identically to the RecoveryPermitter built into Akka.Persistence.
94+
/// Works almost identically to the RecoveryPermitter built into Akka.Persistence.
95+
///
96+
/// NOTE: Since this permitter works with Ask operation from outside an actor,
97+
/// we can not rely on the actor termination as a signal for permit revocation.
98+
///
99+
/// A query operation needs to be executed within the context of the permit
100+
/// and an Ask temporary actor will always terminate before the actual
101+
/// permits are used, making the Terminated message useless for this use case.
102+
///
103+
/// ALWAYS USE A TRY...FINALLY BLOCK WHEN USING ASK AND RETURN THE PERMIT IN
104+
/// THE FINALLY BLOCK
48105
/// </remarks>
49106
internal sealed class QueryThrottler : ReceiveActor
50107
{
51-
private readonly LinkedList<IActorRef> _pending = new();
108+
private readonly LinkedList<PendingRequest> _pending = new();
52109
private readonly ILoggingAdapter _log = Context.GetLogger();
110+
private long _watchCount;
53111
private int _usedPermits;
54112
private int _maxPendingStats;
55113

56114
public QueryThrottler(int maxPermits)
57115
{
58116
MaxPermits = maxPermits;
59117

60-
Receive<RequestQueryStart>(_ =>
118+
Receive<RequestQueryStart>(request =>
61119
{
62-
Context.Watch(Sender);
120+
if(Sender is ActorRefWithCell)
121+
{
122+
_watchCount++;
123+
Context.Watch(Sender);
124+
}
125+
63126
if (_usedPermits >= MaxPermits)
64127
{
65128
if (_pending.Count == 0)
66129
_log.Debug("Exceeded max-concurrent-queries[{0}]. First pending {1}", MaxPermits, Sender);
67-
_pending.AddLast(Sender);
130+
_pending.AddLast(new PendingRequest(Sender, request.DeadlineTime));
68131
_maxPendingStats = Math.Max(_maxPendingStats, _pending.Count);
69132
}
70133
else
@@ -75,16 +138,34 @@ public QueryThrottler(int maxPermits)
75138

76139
Receive<ReturnQueryStart>(_ =>
77140
{
78-
ReturnQueryPermit(Sender);
141+
if(Sender is ActorRefWithCell)
142+
Context.Unwatch(Sender);
143+
144+
ReturnQueryPermit();
79145
});
80146

81147
Receive<Terminated>(terminated =>
82148
{
83-
if (!_pending.Remove(terminated.ActorRef))
149+
var actor = terminated.ActorRef;
150+
if(actor is ActorRefWithCell)
151+
Context.Unwatch(actor);
152+
153+
var pending = _pending.FirstOrDefault(p => p.Requester.Equals(actor));
154+
if (pending is not null)
84155
{
85-
ReturnQueryPermit(terminated.ActorRef);
156+
_pending.Remove(pending);
157+
}
158+
else
159+
{
160+
ReturnQueryPermit();
86161
}
87162
});
163+
164+
#region Test handlers
165+
Receive<GetUsedPermits>(_ => Sender.Tell(_usedPermits));
166+
Receive<GetPendingRequests>(_ => Sender.Tell(_pending.ToArray()));
167+
Receive<GetWatchCount>(_ => Sender.Tell(_watchCount));
168+
#endregion
88169
}
89170

90171
public int MaxPermits { get; }
@@ -95,19 +176,36 @@ private void QueryStartGranted(IActorRef actorRef)
95176
actorRef.Tell(Query.QueryStartGranted.Instance);
96177
}
97178

98-
private void ReturnQueryPermit(IActorRef actorRef)
179+
private void ReturnQueryPermit()
99180
{
100181
_usedPermits--;
101-
Context.Unwatch(actorRef);
102182

183+
// _usedPermits can go negative if a piece of code returns
184+
// granted permits multiple times. This is not a critical
185+
// error, the throttler should not stop working because of this.
186+
//
187+
// However, if this does trip, we will need to look into
188+
// the query codes and figure out which code is over returning
189+
// permits.
103190
if (_usedPermits < 0)
104-
throw new IllegalStateException("Permits must not be negative");
191+
{
192+
_log.Warning("Permits must not be negative");
193+
_usedPermits = 0;
194+
return;
195+
}
105196

106-
var popRef = _pending.First?.Value;
107-
if (popRef is not null)
197+
while (_pending.First is not null)
108198
{
199+
var pending = _pending.First.Value;
109200
_pending.RemoveFirst();
110-
QueryStartGranted(popRef);
201+
if (pending is not null && pending.IsExpired)
202+
pending = null;
203+
204+
if (pending is null)
205+
continue;
206+
207+
QueryStartGranted(pending.Requester);
208+
break;
111209
}
112210

113211
if (_pending.Count != 0 || _maxPendingStats <= 0)

0 commit comments

Comments
 (0)