Skip to content

Commit 2280c9e

Browse files
author
Rafal Maciag
committed
CommandPool
1 parent f971c88 commit 2280c9e

9 files changed

+227
-60
lines changed

src/MicroPlumberd.DirectConnect/MicroPlumberd.Services.Grpc.DirectConnect.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
</ItemGroup>
2626

2727
<ItemGroup>
28-
<PackageReference Include="ModelingEvolution.DirectConnect" Version="1.0.7.25" />
28+
<PackageReference Include="ModelingEvolution.DirectConnect" Version="1.1.0.1" />
2929
<PackageReference Include="System.Collections.Immutable" Version="9.0.1" />
3030
</ItemGroup>
3131

src/MicroPlumberd.Services/CommandBus.cs

+5-54
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Diagnostics;
44
using System.IO;
55
using System.Linq;
6-
using System.Net;
76
using System.Reflection;
87
using System.Runtime.Serialization;
98
using System.Security.Cryptography.X509Certificates;
@@ -17,10 +16,10 @@
1716

1817
namespace MicroPlumberd.Services;
1918

20-
2119
class CommandBus : ICommandBus, IEventHandler
2220
{
2321
private readonly IPlumber _plumber;
22+
private readonly ICommandBusPool _pool;
2423
private readonly ILogger<CommandBus> _log;
2524
private readonly string _streamIn;
2625
private readonly string _streamOut;
@@ -31,9 +30,10 @@ class CommandBus : ICommandBus, IEventHandler
3130
private readonly object _sync = new object();
3231
private IAsyncDisposable? _subscription;
3332
public Guid SessionId { get; } = Guid.NewGuid();
34-
public CommandBus(IPlumber plumber, ILogger<CommandBus> log)
33+
public CommandBus(IPlumber plumber, ICommandBusPool pool, ILogger<CommandBus> log)
3534
{
3635
_plumber = plumber;
36+
_pool = pool;
3737
_log = log;
3838
var servicesConventions = plumber.Config.Conventions.ServicesConventions();
3939
_streamIn = servicesConventions.SessionInStreamFromSessionIdConvention(SessionId);
@@ -75,8 +75,8 @@ private bool TryMapEventResponse(string type, out Type t)
7575

7676
public async Task QueueAsync(object recipientId, object command, TimeSpan? timeout = null, bool fireAndForget = true, CancellationToken token = default)
7777
{
78-
await using CommandBus bus = new CommandBus(this._plumber, this._log);
79-
await bus.SendAsync(recipientId, command, timeout ?? TimeSpan.MaxValue, fireAndForget, token);
78+
using var scope = await _pool.RentScope(token);
79+
await scope.SendAsync(recipientId, command, timeout ?? TimeSpan.MaxValue, fireAndForget, token);
8080
}
8181
public async Task SendAsync(object recipientId, object command, TimeSpan? timeout = null, bool fireAndForget = false, CancellationToken token = default)
8282
{
@@ -170,56 +170,7 @@ async Task IEventHandler.Handle(Metadata m, object ev)
170170
public ValueTask DisposeAsync() => _subscription?.DisposeAsync() ?? ValueTask.CompletedTask;
171171
}
172172

173-
174-
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, AllowMultiple=true)]
175-
public class ThrowsFaultExceptionAttribute<TMessage>() : ThrowsFaultExceptionAttribute(typeof(TMessage));
176-
177173
public abstract class ThrowsFaultExceptionAttribute(Type thrownType) : Attribute
178174
{
179175
public Type ThrownType { get; init; } = thrownType;
180-
}
181-
182-
183-
public class CommandExecutionResults
184-
{
185-
public async ValueTask<bool> Handle(Metadata m, object ev)
186-
{
187-
switch (ev)
188-
{
189-
case CommandExecuted ce:
190-
{
191-
IsSuccess = true;
192-
IsReady.SetResult(true);
193-
return true;
194-
}
195-
case ICommandFailedEx ef:
196-
{
197-
IsSuccess = false;
198-
ErrorMessage = ef.Message;
199-
ErrorData = ef.Fault;
200-
ErrorCode = ef.Code;
201-
IsReady.SetResult(true);
202-
return true;
203-
}
204-
case ICommandFailed cf:
205-
{
206-
IsSuccess = false;
207-
ErrorMessage = cf.Message;
208-
ErrorCode = cf.Code;
209-
IsReady.SetResult(true);
210-
return true;
211-
}
212-
}
213-
214-
return false;
215-
}
216-
217-
public HttpStatusCode ErrorCode { get; private set; }
218-
219-
220-
public string ErrorMessage { get; private set; }
221-
public object? ErrorData { get; private set; }
222-
public bool IsSuccess { get; private set; }
223-
public TaskCompletionSource<bool> IsReady { get; private set; } = new TaskCompletionSource<bool>();
224-
225176
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
using System.Collections.Concurrent;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.Logging;
4+
5+
namespace MicroPlumberd.Services;
6+
7+
class CommandBusPool : IAsyncDisposable, ICommandBusPool
8+
{
9+
private class CommandBusOwner : ICommandBusOwner
10+
{
11+
private readonly CommandBusPool _parent;
12+
private readonly ICommandBus _commandBus;
13+
14+
public Task SendAsync(object recipientId, object command, TimeSpan? timeout = null, bool fireAndForget = false,
15+
CancellationToken token = default)
16+
{
17+
return _commandBus.SendAsync(recipientId, command, timeout, fireAndForget, token);
18+
}
19+
20+
21+
internal CommandBusOwner(CommandBusPool parent, ICommandBus cb)
22+
{
23+
_parent = parent;
24+
this._commandBus = cb;
25+
}
26+
27+
public void Dispose()
28+
{
29+
_parent.Return(this);
30+
}
31+
32+
public ICommandBus CommandBus => _commandBus;
33+
}
34+
private readonly IServiceProvider _sp;
35+
protected readonly int _maxCount;
36+
private ConcurrentStack<CommandBusOwner> _pool;
37+
private SemaphoreSlim _semaphore;
38+
private bool _disposed;
39+
public CommandBusPool(IServiceProvider sp, int maxCount)
40+
{
41+
_sp = sp;
42+
_maxCount = maxCount;
43+
}
44+
45+
internal void Return(ICommandBusOwner o)
46+
{
47+
if (o is not CommandBusOwner cbo)
48+
throw new ArgumentException();
49+
50+
if (_disposed)
51+
{
52+
cbo.CommandBus.DisposeAsync();
53+
return;
54+
}
55+
56+
_pool.Push(cbo);
57+
_semaphore.Release();
58+
}
59+
public ICommandBusPool Init()
60+
{
61+
_pool = new ConcurrentStack<CommandBusOwner>(Create(number: _maxCount).Select(x=>new CommandBusOwner(this,x)));
62+
_semaphore = new SemaphoreSlim(_maxCount);
63+
return this;
64+
}
65+
66+
public async ValueTask<ICommandBusOwner> RentScope(CancellationToken ct = default)
67+
{
68+
await _semaphore.WaitAsync(ct);
69+
if (!_pool.TryPop(out var x))
70+
throw new InvalidOperationException();
71+
return x;
72+
}
73+
public virtual IEnumerable<ICommandBus> Create(int number)
74+
{
75+
// Command is configured to be singleton in the container.
76+
var pl = _sp.GetRequiredService<IPlumber>();
77+
var logger = _sp.GetRequiredService<ILogger<CommandBus>>();
78+
for (int i = 0; i < number; ++i)
79+
yield return new CommandBus(pl,this, logger);
80+
}
81+
82+
public virtual async ValueTask DisposeAsync()
83+
{
84+
if (_disposed) return;
85+
_disposed = true;
86+
87+
if (_semaphore is IAsyncDisposable semaphoreAsyncDisposable)
88+
await semaphoreAsyncDisposable.DisposeAsync();
89+
else
90+
_semaphore.Dispose();
91+
92+
foreach (var i in _pool.ToArray())
93+
await i.CommandBus.DisposeAsync();
94+
}
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
3+
namespace MicroPlumberd.Services;
4+
5+
sealed class CommandBusPoolScoped(IServiceProvider sp, int maxCount) : CommandBusPool(sp, maxCount)
6+
{
7+
private readonly IServiceScope _scope = sp.CreateScope();
8+
9+
public override IEnumerable<ICommandBus> Create(int number)
10+
{
11+
for (int i = 0; i < _maxCount; ++i)
12+
yield return _scope.ServiceProvider.GetRequiredService<ICommandBus>();
13+
}
14+
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System.Net;
2+
3+
namespace MicroPlumberd.Services;
4+
5+
public class CommandExecutionResults
6+
{
7+
public async ValueTask<bool> Handle(Metadata m, object ev)
8+
{
9+
switch (ev)
10+
{
11+
case CommandExecuted ce:
12+
{
13+
IsSuccess = true;
14+
IsReady.SetResult(true);
15+
return true;
16+
}
17+
case ICommandFailedEx ef:
18+
{
19+
IsSuccess = false;
20+
ErrorMessage = ef.Message;
21+
ErrorData = ef.Fault;
22+
ErrorCode = ef.Code;
23+
IsReady.SetResult(true);
24+
return true;
25+
}
26+
case ICommandFailed cf:
27+
{
28+
IsSuccess = false;
29+
ErrorMessage = cf.Message;
30+
ErrorCode = cf.Code;
31+
IsReady.SetResult(true);
32+
return true;
33+
}
34+
}
35+
36+
return false;
37+
}
38+
39+
public HttpStatusCode ErrorCode { get; private set; }
40+
41+
42+
public string ErrorMessage { get; private set; }
43+
public object? ErrorData { get; private set; }
44+
public bool IsSuccess { get; private set; }
45+
public TaskCompletionSource<bool> IsReady { get; private set; } = new TaskCompletionSource<bool>();
46+
47+
}

src/MicroPlumberd.Services/ContainerExtensions.cs

+32-5
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,54 @@ public async Task Handle(Metadata m, object ev)
2323
public static class ContainerExtensions
2424
{
2525
public static IServiceCollection AddPlumberd(this IServiceCollection collection,
26-
EventStoreClientSettings? settings = null, Action<IServiceProvider, IPlumberConfig>? configure = null, bool scopedCommandBus = false) =>
27-
collection.AddPlumberd(sp => settings, configure, scopedCommandBus);
26+
EventStoreClientSettings? settings = null, Action<IServiceProvider, IPlumberConfig>? configure = null, bool scopedCommandBus = false, int commandBusPoolSize = 64) =>
27+
collection.AddPlumberd(sp => settings, configure, scopedCommandBus, commandBusPoolSize);
2828

29+
/// <summary>
30+
/// Adds the MicroPlumberd services to the specified <see cref="IServiceCollection"/>.
31+
/// </summary>
32+
/// <param name="collection">
33+
/// The <see cref="IServiceCollection"/> to which the services will be added.
34+
/// </param>
35+
/// <param name="settingsFactory">
36+
/// A factory function to create <see cref="EventStoreClientSettings"/> for configuring the Event Store client.
37+
/// </param>
38+
/// <param name="configure">
39+
/// An optional action to configure the <see cref="IPlumberConfig"/> instance.
40+
/// </param>
41+
/// <param name="scopedCommandBus">
42+
/// A boolean value indicating whether the <see cref="ICommandBus"/> should be registered as scoped.
43+
/// </param>
44+
/// <param name="commandBusPoolSize">
45+
/// The size of the command bus pool - it's used for QueueAsync operation on ICommandBus. Defaults to 64.
46+
/// </param>
47+
/// <returns>
48+
/// The updated <see cref="IServiceCollection"/> with the MicroPlumberd services added.
49+
/// </returns>
2950
public static IServiceCollection AddPlumberd(this IServiceCollection collection,
30-
Func<IServiceProvider, EventStoreClientSettings> settingsFactory, Action<IServiceProvider, IPlumberConfig>? configure = null, bool scopedCommandBus = false)
51+
Func<IServiceProvider, EventStoreClientSettings> settingsFactory, Action<IServiceProvider, IPlumberConfig>? configure = null, bool scopedCommandBus = false, int commandBusPoolSize=64)
3152
{
3253
collection.AddSingleton(sp => Plumber.Create(settingsFactory(sp), x =>
3354
{
3455
configure?.Invoke(sp, x);
3556
x.ServiceProvider = sp;
3657
}));
3758
collection.AddSingleton<StartupHealthCheck>();
38-
59+
3960
collection.AddBackgroundServiceIfMissing<CommandHandlerService>();
4061
collection.AddBackgroundServiceIfMissing<EventHandlerService>();
4162

4263
collection.TryAddSingleton(typeof(ISnapshotPolicy<>), typeof(AttributeSnaphotPolicy<>));
43-
if(scopedCommandBus)
64+
if (scopedCommandBus)
65+
{
4466
collection.TryAddScoped<ICommandBus, CommandBus>();
67+
collection.TryAddSingleton<ICommandBusPool>(sp => new CommandBusPoolScoped(sp, commandBusPoolSize).Init());
68+
}
4569
else
70+
{
4671
collection.TryAddSingleton<ICommandBus, CommandBus>();
72+
collection.TryAddSingleton<ICommandBusPool>(sp => new CommandBusPool(sp, commandBusPoolSize).Init());
73+
}
4774
collection.TryAddSingleton(typeof(IEventHandler<>), typeof(EventHandlerExecutor<>));
4875

4976

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace MicroPlumberd.Services;
2+
3+
public interface ICommandBusOwner : IDisposable
4+
{
5+
Task SendAsync(object recipientId, object command, TimeSpan? timeout = null, bool fireAndForget = false,
6+
CancellationToken token = default);
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
namespace MicroPlumberd.Services;
2+
3+
/// <summary>
4+
/// Represents a pool of command buses that can be rented and returned for managing command execution.
5+
/// </summary>
6+
public interface ICommandBusPool
7+
{
8+
/// <summary>
9+
/// Rents an <see cref="ICommandBusOwner"/> instance from the pool, allowing the caller to send commands
10+
/// using the rented command bus. The rented instance must be returned to the pool after use by calling dispose
11+
/// </summary>
12+
/// <param name="ct">A <see cref="CancellationToken"/> to observe while waiting for a command bus to become available.</param>
13+
/// <returns>
14+
/// A <see cref="ValueTask{TResult}"/> representing the asynchronous operation. The result contains
15+
/// an <see cref="ICommandBusOwner"/> instance that can be used to send commands.
16+
/// </returns>
17+
/// <exception cref="InvalidOperationException">
18+
/// Thrown if the pool is empty and no command bus is available for rent.
19+
/// </exception>
20+
ValueTask<ICommandBusOwner> RentScope(CancellationToken ct = default);
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
namespace MicroPlumberd.Services;
2+
3+
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, AllowMultiple=true)]
4+
public class ThrowsFaultExceptionAttribute<TMessage>() : ThrowsFaultExceptionAttribute(typeof(TMessage));

0 commit comments

Comments
 (0)