Skip to content

Commit 26651ed

Browse files
author
Rafał Maciąg
committed
State
1 parent b9e95ef commit 26651ed

File tree

12 files changed

+113
-50
lines changed

12 files changed

+113
-50
lines changed

src/MicroPlumberd.ProcessManager.Abstractions/IProcessManager.cs

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public abstract class ProcessManagerBase<TId> : IVersionAware, IIdAware, IId<TId
2222
object IIdAware.Id { set => _id = (TId)value; }
2323
public long Version => _version;
2424
void IVersionAware.Increase() => _version += 1;
25+
void IVersionAware.SetValue(long nv) { _version = nv; }
2526

2627
public virtual async Task<ICommandRequest?> HandleError(ExecutionContext executionContext) => null;
2728
}

src/MicroPlumberd.Services.ProcessManager/ProcessManagerClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public async Task<IAsyncDisposable> SubscribeProcessManager<TProcessManager>() w
7878
ProcessManagerExecutor<TProcessManager>.Sender sender =
7979
new ProcessManagerExecutor<TProcessManager>.Sender(this);
8080
var c = AsyncDisposableCollection.New();
81-
c += await Plumber.SubscribeEventHandlerPersistently(sender, $"{typeof(TProcessManager).Name}Outbox", ensureOutputStreamProjection:true);
82-
c += await Plumber.SubscribeEventHandlerPersistently(executor, $"{typeof(TProcessManager).Name}Inbox", ensureOutputStreamProjection:true);
81+
c += await Plumber.SubscribeEventHandlerPersistently(sender, $"{typeof(TProcessManager).Name}Outbox", ensureOutputStreamProjection: true);
82+
c += await Plumber.SubscribeEventHandlerPersistently(executor, $"{typeof(TProcessManager).Name}Inbox", ensureOutputStreamProjection: true);
8383

8484
await Plumber.ProjectionManagementClient.EnsureLookupProjection(Plumber.ProjectionRegister,
8585
typeof(TProcessManager).Name, "RecipientId", $"{typeof(TProcessManager).Name}Lookup");

src/MicroPlumberd.Services/CommandHandlerStarter.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static StreamPosition ToStreamPosition(this FromStream fs)
4444
class EventHandlerStarter<THandler>(IPlumber plumber) : IEventHandlerStarter
4545
where THandler : class, IEventHandler, ITypeRegister
4646
{
47-
public async Task Start()
47+
public async Task Start(CancellationToken stoppingToken)
4848
{
4949
if (!Persistently)
5050
await plumber.SubscribeEventHandler<THandler>(start: StartPosition);

src/MicroPlumberd.Services/EventHandlerService.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ sealed class EventHandlerService(IEnumerable<IEventHandlerStarter> starters) : B
1111
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
1212
{
1313
foreach (var i in starters)
14-
await i.Start();
14+
await i.Start(stoppingToken);
1515
}
1616

1717
}

src/MicroPlumberd.Services/ICommandHandlerStarter.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
interface IEventHandlerStarter
44
{
5-
Task Start();
5+
Task Start(CancellationToken stoppingToken);
66
}
77
interface ICommandHandlerStarter
88
{

src/MicroPlumberd.Tests.App/FooDomain/FooAggregate.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@
77

88
namespace MicroPlumberd.Tests.App.Domain;
99

10-
10+
public record FooEntityState : IId<Guid>, IVersionAware
11+
{
12+
public Guid Id { get; set; } = Guid.NewGuid();
13+
public string Name { get; set; }
14+
public long Version { get; private set; } = -1;
15+
void IVersionAware.SetValue(long nv) => Version = nv;
16+
}
1117

1218
[Aggregate]
1319
[ThrowsFaultException<BusinessFaultException>()]

src/MicroPlumberd.Tests/Integration/AggregateTests.cs

+45
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,51 @@ public void GivenMethodsAreAccessible()
2323
}
2424
}
2525

26+
[TestCategory("Integration")]
27+
public class StateTests : IClassFixture<EventStoreServer>
28+
{
29+
private readonly IPlumber plumber;
30+
private readonly EventStoreServer es;
31+
32+
public StateTests(EventStoreServer es)
33+
{
34+
plumber = Plumber.Create(es.GetEventStoreSettings());
35+
this.es = es;
36+
}
37+
38+
[Fact]
39+
public async Task DoubleWrite()
40+
{
41+
await es.StartInDocker();
42+
43+
var st = new FooEntityState() { Name = "Foo"};
44+
await plumber.AppendState(st);
45+
46+
st.Name = "Bar";
47+
await plumber.AppendState(st);
48+
49+
FooEntityState actual = await plumber.GetState<FooEntityState>(st.Id);
50+
51+
actual.Should().Be(st);
52+
}
53+
[Fact]
54+
public async Task WriteAfterGet()
55+
{
56+
await es.StartInDocker();
57+
58+
var st = new FooEntityState() { Name = "Foo" };
59+
await plumber.AppendState(st);
60+
61+
FooEntityState nx = await plumber.GetState<FooEntityState>(st.Id);
62+
nx.Name = "Bar";
63+
await plumber.AppendState(nx);
64+
65+
FooEntityState actual = await plumber.GetState<FooEntityState>(st.Id);
66+
actual.Should().Be(nx);
67+
68+
}
69+
}
70+
2671
[TestCategory("Integration")]
2772
public class AggregateTests : IClassFixture<EventStoreServer>
2873
{

src/MicroPlumberd.Tests/Integration/ReadModelTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public async Task SubscribeModelPersistently()
3030

3131
var fooModel = new FooModel(new InMemoryAssertionDb());
3232

33-
var sub = await plumber.SubscribeEventHandlerPersistently(fooModel, startFrom:StreamPosition.Start);
33+
var sub = await plumber.SubscribeEventHandlerPersistently(fooModel, startFrom: StreamPosition.Start);
3434

3535
await Task.Delay(1000);
3636

src/MicroPlumberd/Abstractions/IAggregate.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public interface IVersioned
7979
/// <seealso cref="MicroPlumberd.IVersioned" />
8080
public interface IVersionAware : IVersioned
8181
{
82-
void Increase();
82+
void Increase() => SetValue(this.Version + 1);
83+
void SetValue(long nv);
8384
}
8485
static class Extensions
8586
{

src/MicroPlumberd/Abstractions/IPlumber.cs

+9-5
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ Task<IWriteResult> AppendEvents(string streamId, StreamState state, IEnumerable<
180180
/// <param name="userCredentials">The user credentials.</param>
181181
/// <param name="cancellationToken">The cancellation token.</param>
182182
/// <returns></returns>
183-
ISubscriptionRunner Subscribe(string streamName, FromRelativeStreamPosition start, UserCredentials? userCredentials = null, CancellationToken cancellationToken = new CancellationToken());
183+
ISubscriptionRunner Subscribe(string streamName, FromRelativeStreamPosition start, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default);
184184

185185
/// <summary>
186186
/// Subscribes the event handler. EventHandler is a class that contains many overloaded 'Given' methods. A projection will be created at EventStore that creates a joined stream from all supported event-types by EventHandler.
@@ -211,11 +211,12 @@ Task<IAsyncDisposable> SubscribeEventHandler<TEventHandler>(TypeEventConverter m
211211
/// <param name="outputStream">The output stream.</param>
212212
/// <param name="start">The start.</param>
213213
/// <param name="ensureOutputStreamProjection">if set to <c>true</c> [ensure output stream projection].</param>
214+
/// <param name="token"></param>
214215
/// <returns></returns>
215-
Task<IAsyncDisposable> SubscribeEventHandler<TEventHandler>(TEventHandler? eh = default, string? outputStream = null, FromRelativeStreamPosition? start = null, bool ensureOutputStreamProjection = true)
216+
Task<IAsyncDisposable> SubscribeEventHandler<TEventHandler>(TEventHandler? eh = default, string? outputStream = null, FromRelativeStreamPosition? start = null, bool ensureOutputStreamProjection = true, CancellationToken token = default)
216217
where TEventHandler : class, IEventHandler, ITypeRegister;
217218

218-
219+
219220
/// <summary>
220221
/// Subscribes the event handler persistently. EventHandler is a class that contains many overloaded 'Given' methods. A projection will be created at EventStore that creates a joined stream from all supported event-types by EventHandler.
221222
/// Then EventHandler subscribe the the output stream.
@@ -226,8 +227,9 @@ Task<IAsyncDisposable> SubscribeEventHandler<TEventHandler>(TEventHandler? eh =
226227
/// <param name="groupName">Optional group name.</param>
227228
/// <param name="startFrom">Optional start of the stream.</param>
228229
/// <param name="ensureOutputStreamProjection">when true creates projection that creates output's stream</param>
230+
/// <param name="token"></param>
229231
/// <returns></returns>
230-
Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TEventHandler? model=null, string? outputStream = null, string? groupName = null, IPosition? startFrom = null, bool ensureOutputStreamProjection = true) where TEventHandler : class,IEventHandler, ITypeRegister;
232+
Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TEventHandler? model=null, string? outputStream = null, string? groupName = null, IPosition? startFrom = null, bool ensureOutputStreamProjection = true, CancellationToken token = default) where TEventHandler : class,IEventHandler, ITypeRegister;
231233

232234
/// <summary>
233235
/// Returns a subscription builder that will subscribe model persistently.
@@ -333,11 +335,13 @@ Task<IWriteResult> AppendLink(string streamId, Metadata metadata, StreamState? s
333335
/// <param name="groupName">Name of the group.</param>
334336
/// <param name="startFrom">The start from.</param>
335337
/// <param name="ensureOutputStreamProjection">if set to <c>true</c> [ensure output stream projection].</param>
338+
/// <param name="token"></param>
336339
/// <returns></returns>
337340
Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TypeEventConverter mapFunc,
338341
IEnumerable<string>? events,
339342
TEventHandler? model,
340-
string? outputStream = null, string? groupName = null, IPosition? startFrom = null, bool ensureOutputStreamProjection = true)
343+
string? outputStream = null, string? groupName = null, IPosition? startFrom = null,
344+
bool ensureOutputStreamProjection = true, CancellationToken token = default)
341345
where TEventHandler : class, IEventHandler;
342346

343347
/// <summary>

src/MicroPlumberd/EventStoreProjectionManagementClientExtensions.cs

+16-16
Original file line numberDiff line numberDiff line change
@@ -16,42 +16,42 @@ public static async Task EnsureJoinProjection(this EventStoreProjectionManagemen
1616
await client.CreateContinuousAsync(outputStream, query, true);
1717
}
1818

19-
private static async Task Update(EventStoreProjectionManagementClient client, string outputStream, string query)
19+
private static async Task Update(EventStoreProjectionManagementClient client, string outputStream, string query, CancellationToken token = default)
2020
{
21-
var state = await client.GetStatusAsync(outputStream);
21+
var state = await client.GetStatusAsync(outputStream, cancellationToken: token);
2222
if (state!.Status != "Stopped")
23-
await client.DisableAsync(outputStream);
24-
await client.UpdateAsync(outputStream, query, true);
25-
await client.EnableAsync(outputStream);
23+
await client.DisableAsync(outputStream, cancellationToken: token);
24+
await client.UpdateAsync(outputStream, query, true, cancellationToken: token);
25+
await client.EnableAsync(outputStream, cancellationToken: token);
2626
}
2727

28-
public static async Task EnsureLookupProjection(this EventStoreProjectionManagementClient client, IProjectionRegister register, string category, string eventProperty, string outputStreamCategory)
28+
public static async Task EnsureLookupProjection(this EventStoreProjectionManagementClient client, IProjectionRegister register, string category, string eventProperty, string outputStreamCategory, CancellationToken token = default)
2929
{
3030
string query =
3131
$"fromStreams(['$ce-{category}']).when( {{ \n $any : function(s,e) {{ \n if(e.body && e.body.{eventProperty}) {{\n linkTo('{outputStreamCategory}-' + e.body.{eventProperty}, e) \n }}\n \n }}\n}});";
3232
if ((await register.Get(outputStreamCategory)) != null)
3333
await Update(client, outputStreamCategory, query);
3434
else
3535
{
36-
await client.CreateContinuousAsync(outputStreamCategory, query, false);
37-
await client.DisableAsync(outputStreamCategory);
38-
await client.UpdateAsync(outputStreamCategory, query, true);
39-
await client.EnableAsync(outputStreamCategory);
36+
await client.CreateContinuousAsync(outputStreamCategory, query, false, cancellationToken: token);
37+
await client.DisableAsync(outputStreamCategory, cancellationToken: token);
38+
await client.UpdateAsync(outputStreamCategory, query, true, cancellationToken: token);
39+
await client.EnableAsync(outputStreamCategory, cancellationToken: token);
4040
}
4141
}
4242
public static async Task EnsureJoinProjection(this EventStoreProjectionManagementClient client,
43-
string outputStream, IProjectionRegister register, IEnumerable<string> eventTypes)
43+
string outputStream, IProjectionRegister register, IEnumerable<string> eventTypes, CancellationToken token = default)
4444
{
4545
var query = CreateQuery(outputStream, eventTypes);
4646

4747
if ((await register.Get(outputStream)) != null)
48-
await Update(client, outputStream, query);
48+
await Update(client, outputStream, query, token);
4949
else
5050
{
51-
await client.CreateContinuousAsync(outputStream, query, false);
52-
await client.DisableAsync(outputStream);
53-
await client.UpdateAsync(outputStream, query, true);
54-
await client.EnableAsync(outputStream);
51+
await client.CreateContinuousAsync(outputStream, query, false, cancellationToken: token);
52+
await client.DisableAsync(outputStream, cancellationToken: token);
53+
await client.UpdateAsync(outputStream, query, true, cancellationToken: token);
54+
await client.EnableAsync(outputStream, cancellationToken: token);
5555
}
5656
}
5757

0 commit comments

Comments
 (0)