Skip to content

Commit c93f131

Browse files
author
Rafał Maciąg
committed
Persisted projection defaults changed with min checkpoint to 1.
1 parent da15a16 commit c93f131

File tree

3 files changed

+26
-9
lines changed

3 files changed

+26
-9
lines changed

src/MicroPlumberd.Services.LiteDb/Class1.cs

+14
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ public async Task<IAsyncDisposable> SubscribeEventHandler<TEventHandler>(TEventH
155155
throw new NotImplementedException();
156156
}
157157

158+
public async Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TEventHandler? model = default(TEventHandler?),
159+
string? outputStream = null, string? groupName = null, IPosition? startFrom = null,
160+
bool ensureOutputStreamProjection = true, int minCheckPointCount = 1, CancellationToken token = default) where TEventHandler : class, IEventHandler, ITypeRegister
161+
{
162+
throw new NotImplementedException();
163+
}
164+
158165
public async Task<T> Get<T>(object id, CancellationToken token = default) where T : IAggregate<T>, ITypeRegister, IId
159166
{
160167
throw new NotImplementedException();
@@ -247,6 +254,13 @@ public IAsyncEnumerable<object> Read<TOwner>(StreamPosition? start = null, Direc
247254
throw new NotImplementedException();
248255
}
249256

257+
public async Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TypeEventConverter mapFunc, IEnumerable<string>? events,
258+
TEventHandler? model, string? outputStream = null, string? groupName = null, IPosition? startFrom = null,
259+
bool ensureOutputStreamProjection = true, int minCheckPointCount = 1, CancellationToken token = default) where TEventHandler : class, IEventHandler
260+
{
261+
throw new NotImplementedException();
262+
}
263+
250264
public IAsyncEnumerable<object> Read<TOwner>(object id, StreamPosition? start = null, Direction? direction = null,
251265
long maxCount = 9223372036854775807, CancellationToken token = default) where TOwner : ITypeRegister
252266
{

src/MicroPlumberd/Abstractions/IPlumber.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,8 @@ Task<IAsyncDisposable> SubscribeEventHandler<TEventHandler>(TEventHandler? eh =
229229
/// <param name="ensureOutputStreamProjection">when true creates projection that creates output's stream</param>
230230
/// <param name="token"></param>
231231
/// <returns></returns>
232-
Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TEventHandler? model=null, string? outputStream = null, string? groupName = null, IPosition? startFrom = null, bool ensureOutputStreamProjection = true, CancellationToken token = default) where TEventHandler : class,IEventHandler, ITypeRegister;
232+
Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TEventHandler? model=null, string? outputStream = null, string? groupName = null, IPosition? startFrom = null, bool ensureOutputStreamProjection = true, int minCheckPointCount=1, CancellationToken token = default)
233+
where TEventHandler : class,IEventHandler, ITypeRegister;
233234

234235
/// <summary>
235236
/// Returns a subscription builder that will subscribe model persistently.
@@ -240,7 +241,7 @@ Task<IAsyncDisposable> SubscribeEventHandler<TEventHandler>(TEventHandler? eh =
240241
/// <param name="userCredentials">The user credentials.</param>
241242
/// <param name="cancellationToken">The cancellation token.</param>
242243
/// <returns></returns>
243-
ISubscriptionRunner SubscribePersistently(string streamName, string groupName, int bufferSize = 10, UserCredentials? userCredentials = null, CancellationToken cancellationToken = new CancellationToken());
244+
ISubscriptionRunner SubscribePersistently(string streamName, string groupName, int bufferSize = 10, UserCredentials? userCredentials = null,CancellationToken cancellationToken = new CancellationToken());
244245

245246
/// <summary>
246247
/// Rehydrates the specified model.
@@ -341,7 +342,7 @@ Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TypeEven
341342
IEnumerable<string>? events,
342343
TEventHandler? model,
343344
string? outputStream = null, string? groupName = null, IPosition? startFrom = null,
344-
bool ensureOutputStreamProjection = true, CancellationToken token = default)
345+
bool ensureOutputStreamProjection = true,int minCheckPointCount=1, CancellationToken token = default)
345346
where TEventHandler : class, IEventHandler;
346347

347348
/// <summary>

src/MicroPlumberd/Plumber.cs

+8-6
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ public async Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHand
135135
IEnumerable<string>? events,
136136
TEventHandler? model,
137137
string? outputStream = null, string? groupName = null, IPosition? startFrom = null,
138-
bool ensureOutputStreamProjection = true, CancellationToken token = default)
138+
bool ensureOutputStreamProjection = true,
139+
int minCheckpointCount = 1,
140+
CancellationToken token = default)
139141
where TEventHandler : class, IEventHandler
140142
{
141143
var handlerType = typeof(TEventHandler);
@@ -152,7 +154,7 @@ public async Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHand
152154
catch (PersistentSubscriptionNotFoundException)
153155
{
154156
await PersistentSubscriptionClient.CreateToStreamAsync(outputStream, groupName,
155-
new PersistentSubscriptionSettings(true, startFrom), cancellationToken: token);
157+
new PersistentSubscriptionSettings(true, startFrom, checkPointLowerBound: minCheckpointCount), cancellationToken: token);
156158
}
157159

158160
var sub = SubscribePersistently(outputStream, groupName, cancellationToken:token);
@@ -166,20 +168,20 @@ await PersistentSubscriptionClient.CreateToStreamAsync(outputStream, groupName,
166168

167169
public Task<IAsyncDisposable> SubscribeEventHandlerPersistently<TEventHandler>(TEventHandler? model,
168170
string? outputStream = null, string? groupName = null, IPosition? startFrom = null,
169-
bool ensureOutputStreamProjection = true, CancellationToken token = default)
171+
bool ensureOutputStreamProjection = true, int minCheckPointCount = 1, CancellationToken token = default)
170172
where TEventHandler : class, IEventHandler, ITypeRegister
171173
{
172174
return SubscribeEventHandlerPersistently(_typeHandlerRegisters.GetEventNameConverterFor<TEventHandler>(),
173175
_typeHandlerRegisters.GetEventNamesFor<TEventHandler>(),
174-
model, outputStream, groupName, startFrom, ensureOutputStreamProjection, token);
176+
model, outputStream, groupName, startFrom, ensureOutputStreamProjection, minCheckPointCount, token);
175177
}
176178

177179

178-
public ISubscriptionRunner SubscribePersistently(string streamName, string groupName, int bufferSize = 10,
180+
public ISubscriptionRunner SubscribePersistently(string streamName, string groupName, int bufferSize = 10,
179181
UserCredentials? userCredentials = null, CancellationToken cancellationToken = default)
180182
{
181183
return new PersistentSubscriptionRunner(this,
182-
PersistentSubscriptionClient.SubscribeToStream(streamName, groupName, bufferSize, userCredentials,
184+
PersistentSubscriptionClient.SubscribeToStream(streamName, groupName, bufferSize, userCredentials,
183185
cancellationToken));
184186
}
185187

0 commit comments

Comments
 (0)