Skip to content

Commit 8b6c0ff

Browse files
Akka.Actor: IStash API and configuration enhancements (#6660)
* close #6658 Add APIs to track content of Stash * added basic stashing test cases * updated API approvals --------- Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 61fb874 commit 8b6c0ff

File tree

7 files changed

+217
-10
lines changed

7 files changed

+217
-10
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt

+10
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,11 @@ namespace Akka.Actor
11341134
}
11351135
public interface IStash
11361136
{
1137+
int Capacity { get; }
1138+
int Count { get; }
1139+
bool IsEmpty { get; }
1140+
bool IsFull { get; }
1141+
bool NonEmpty { get; }
11371142
System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash();
11381143
void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes);
11391144
void Stash();
@@ -1933,6 +1938,11 @@ namespace Akka.Actor.Internal
19331938
public abstract class AbstractStash : Akka.Actor.IStash
19341939
{
19351940
protected AbstractStash(Akka.Actor.IActorContext context) { }
1941+
public int Capacity { get; }
1942+
public int Count { get; }
1943+
public bool IsEmpty { get; }
1944+
public bool IsFull { get; }
1945+
public bool NonEmpty { get; }
19361946
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
19371947
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
19381948
public void Stash() { }

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt

+10
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,11 @@ namespace Akka.Actor
11321132
}
11331133
public interface IStash
11341134
{
1135+
int Capacity { get; }
1136+
int Count { get; }
1137+
bool IsEmpty { get; }
1138+
bool IsFull { get; }
1139+
bool NonEmpty { get; }
11351140
System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash();
11361141
void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes);
11371142
void Stash();
@@ -1931,6 +1936,11 @@ namespace Akka.Actor.Internal
19311936
public abstract class AbstractStash : Akka.Actor.IStash
19321937
{
19331938
protected AbstractStash(Akka.Actor.IActorContext context) { }
1939+
public int Capacity { get; }
1940+
public int Count { get; }
1941+
public bool IsEmpty { get; }
1942+
public bool IsFull { get; }
1943+
public bool NonEmpty { get; }
19341944
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
19351945
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
19361946
public void Stash() { }

src/core/Akka.Persistence/Eventsourced.cs

+6
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,12 @@ public void Prepend(IEnumerable<Envelope> envelopes)
695695
{
696696
_userStash.Prepend(envelopes);
697697
}
698+
699+
public int Count => _userStash.Count;
700+
public bool IsEmpty => _userStash.IsEmpty;
701+
public bool NonEmpty => _userStash.NonEmpty;
702+
public bool IsFull => _userStash.IsFull;
703+
public int Capacity => _userStash.Capacity;
698704
}
699705
}
700706
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="StashCapacitySpecs.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading.Tasks;
10+
using Akka.Actor;
11+
using Akka.Configuration;
12+
using Akka.TestKit;
13+
using Xunit;
14+
using Xunit.Abstractions;
15+
using FluentAssertions;
16+
17+
namespace Akka.Tests.Actor.Stash;
18+
19+
public class StashCapacitySpecs : AkkaSpec
20+
{
21+
public StashCapacitySpecs(ITestOutputHelper output) : base(Config.Empty, output: output)
22+
{
23+
24+
}
25+
26+
[Fact]
27+
public async Task ShouldGetAccurateStashReadingForUnboundedStash()
28+
{
29+
var stashActor = Sys.ActorOf(Props.Create(() => new StashActor()));
30+
stashActor.Tell(new StashActor.StashMessage("1"));
31+
stashActor.Tell(new StashActor.StashMessage("2"));
32+
stashActor.Tell(StashActor.GetStashReadout.Instance);
33+
var readout1 = await ExpectMsgAsync<StashActor.StashReadout>();
34+
readout1.Capacity.Should().Be(-1); // unbounded stash returns -1 for capacity
35+
readout1.Size.Should().Be(2);
36+
readout1.IsEmpty.Should().BeFalse();
37+
readout1.IsFull.Should().BeFalse();
38+
39+
stashActor.Tell(StashActor.UnstashMessage.Instance);
40+
stashActor.Tell(StashActor.GetStashReadout.Instance);
41+
var readout2 = await ExpectMsgAsync<StashActor.StashReadout>();
42+
readout2.Capacity.Should().Be(-1);
43+
readout2.Size.Should().Be(1);
44+
readout2.IsEmpty.Should().BeFalse();
45+
readout2.IsFull.Should().BeFalse();
46+
47+
stashActor.Tell(StashActor.UnstashMessage.Instance);
48+
stashActor.Tell(StashActor.GetStashReadout.Instance);
49+
var readout3 = await ExpectMsgAsync<StashActor.StashReadout>();
50+
readout3.Capacity.Should().Be(-1);
51+
readout3.Size.Should().Be(0);
52+
readout3.IsEmpty.Should().BeTrue();
53+
readout3.IsFull.Should().BeFalse();
54+
}
55+
56+
private class StashActor : UntypedActorWithStash
57+
{
58+
public class StashMessage
59+
{
60+
public StashMessage(string message)
61+
{
62+
Message = message;
63+
}
64+
65+
public string Message { get; }
66+
}
67+
68+
public class UnstashMessage
69+
{
70+
private UnstashMessage()
71+
{
72+
73+
}
74+
public static readonly UnstashMessage Instance = new();
75+
}
76+
77+
public class GetStashReadout
78+
{
79+
private GetStashReadout()
80+
{
81+
82+
}
83+
public static readonly GetStashReadout Instance = new();
84+
}
85+
86+
public class StashReadout
87+
{
88+
public StashReadout(int capacity, int size, bool isEmpty, bool isFull)
89+
{
90+
Capacity = capacity;
91+
Size = size;
92+
IsEmpty = isEmpty;
93+
IsFull = isFull;
94+
}
95+
96+
public int Capacity { get; }
97+
public int Size { get; }
98+
99+
public bool IsEmpty { get; }
100+
101+
public bool IsFull { get; }
102+
}
103+
104+
protected override void OnReceive(object message)
105+
{
106+
switch (message)
107+
{
108+
case StashMessage msg:
109+
Stash.Stash();
110+
break;
111+
case UnstashMessage _:
112+
Stash.Unstash();
113+
Context.Become(Unstashing); // switch behaviors so we're not stuck in stash loop
114+
break;
115+
case GetStashReadout _:
116+
Sender.Tell(new StashReadout(Stash.Capacity, Stash.Count, Stash.IsEmpty, Stash.IsFull));
117+
break;
118+
default:
119+
Unhandled(message);
120+
break;
121+
}
122+
}
123+
124+
private void Unstashing(object message)
125+
{
126+
switch (message)
127+
{
128+
case StashMessage msg: // do nothing
129+
break;
130+
case UnstashMessage when Stash.NonEmpty:
131+
Stash.Unstash();
132+
break;
133+
case UnstashMessage _: // when the stash is empty, go back to stashing
134+
Context.Become(OnReceive);
135+
break;
136+
case GetStashReadout _:
137+
Sender.Tell(new StashReadout(Stash.Capacity, Stash.Count, Stash.IsEmpty, Stash.IsFull));
138+
break;
139+
default:
140+
Unhandled(message);
141+
break;
142+
}
143+
}
144+
}
145+
}

src/core/Akka/Actor/Stash/IStash.cs

+30-2
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,42 @@ public interface IStash
4343
/// Returns all messages and clears the stash.
4444
/// The stash is guaranteed to be empty afterwards.
4545
/// </summary>
46-
/// <returns>TBD</returns>
46+
/// <returns>The previous stashed messages.</returns>
4747
IEnumerable<Envelope> ClearStash();
4848

4949
/// <summary>
50-
/// TBD
50+
/// Prepend a set of envelopes to the front of the stash.
5151
/// </summary>
5252
/// <param name="envelopes">TBD</param>
5353
void Prepend(IEnumerable<Envelope> envelopes);
54+
55+
/// <summary>
56+
/// The number of messages currently inside the stash.
57+
/// </summary>
58+
public int Count { get; }
59+
60+
/// <summary>
61+
/// Returns <c>true</c> when <see cref="Count"/> is zero.
62+
/// </summary>
63+
public bool IsEmpty { get; }
64+
65+
/// <summary>
66+
/// Returns <c>true</c> when <see cref="Count"/> is greater than zero.
67+
/// </summary>
68+
public bool NonEmpty { get; }
69+
70+
/// <summary>
71+
/// When using a bounded stash, this returns <c>true</c> when the stash is full.
72+
/// </summary>
73+
/// <remarks>
74+
/// Always returns <c>false</c> when using an unbounded stash.
75+
/// </remarks>
76+
public bool IsFull { get; }
77+
78+
/// <summary>
79+
/// The total capacity of the stash.
80+
/// </summary>
81+
public int Capacity { get; }
5482
}
5583
}
5684

src/core/Akka/Actor/Stash/Internal/AbstractStash.cs

+15-7
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,6 @@ public abstract class AbstractStash : IStash
3333

3434
private readonly ActorCell _actorCell;
3535

36-
/// <summary>
37-
/// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
38-
/// </summary>
39-
private readonly int _capacity;
40-
4136
/// <summary>
4237
/// The actor's deque-based message queue.
4338
/// `mailbox.queue` is the underlying `Deque`.
@@ -61,7 +56,7 @@ protected AbstractStash(IActorContext context)
6156
_actorCell = actorCell;
6257

6358
// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
64-
_capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox);
59+
Capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox);
6560
}
6661

6762
private int _currentEnvelopeId;
@@ -84,7 +79,7 @@ public void Stash()
8479
}
8580
_currentEnvelopeId = _actorCell.CurrentEnvelopeId;
8681

87-
if (_capacity <= 0 || _theStash.Count < _capacity)
82+
if (Capacity <= 0 || _theStash.Count < Capacity)
8883
_theStash.AddLast(new Envelope(currMsg, sender));
8984
else
9085
throw new StashOverflowException($"Couldn't enqueue message {currMsg} from ${sender} to stash of {_actorCell.Self}");
@@ -189,6 +184,19 @@ public void Prepend(IEnumerable<Envelope> envelopes)
189184
}
190185
}
191186

187+
public int Count => _theStash.Count;
188+
public bool IsEmpty => Count == 0;
189+
public bool NonEmpty => !IsEmpty;
190+
public bool IsFull => Capacity >= 0 && _theStash.Count >= Capacity;
191+
192+
/// <summary>
193+
/// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
194+
/// </summary>
195+
/// <remarks>
196+
/// If capacity is negative, then we're using an Unbounded stash.
197+
/// </remarks>
198+
public int Capacity { get; }
199+
192200
/// <summary>
193201
/// Enqueues <paramref name="msg"/> at the first position in the mailbox. If the message contained in
194202
/// the envelope is a <see cref="Terminated"/> message, it will be ensured that it can be re-received

src/core/Akka/Dispatch/ISemantics.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
namespace Akka.Dispatch
1212
{
1313
/// <summary>
14-
/// TBD
14+
/// Describes the message queue semantics of a mailbox.
1515
/// </summary>
1616
public interface ISemantics
1717
{

0 commit comments

Comments
 (0)