Skip to content

Commit 80d6546

Browse files
hunter1703timothycoleman
authored andcommitted
Add CaughtUp control message for subscriptions
1 parent 0d33050 commit 80d6546

File tree

8 files changed

+493
-17
lines changed

8 files changed

+493
-17
lines changed

src/EventStore.Core.Tests/Helpers/TestFixtureWithExistingEvents.cs

+58-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ public abstract class TestFixtureWithExistingEvents<TLogFormat,TStreamId> : Test
2020
IHandle<ClientMessage.ReadStreamEventsBackward>,
2121
IHandle<ClientMessage.ReadStreamEventsForward>,
2222
IHandle<ClientMessage.ReadAllEventsForward>,
23+
IHandle<ClientMessage.FilteredReadAllEventsForward>,
2324
IHandle<ClientMessage.WriteEvents>,
2425
IHandle<ClientMessage.TransactionStart>,
2526
IHandle<ClientMessage.TransactionWrite>,
2627
IHandle<ClientMessage.TransactionCommit>,
28+
IHandle<ClientMessage.SubscribeToStream>,
29+
IHandle<ClientMessage.FilteredSubscribeToStream>,
2730
IHandle<ClientMessage.DeleteStream> {
2831
public class Transaction {
2932
private readonly ClientMessage.TransactionStart _startMessage;
@@ -89,6 +92,11 @@ protected TFPos ExistingStreamMetadata(string streamId, string metadata) {
8992
}
9093

9194
protected TFPos ExistingEvent(string streamName, string eventType, string eventMetadata, string eventData,
95+
bool isJson = false) {
96+
return WriteEvent(streamName, eventType, eventMetadata, eventData, isJson).Item2;
97+
}
98+
99+
protected (EventRecord, TFPos) WriteEvent(string streamName, string eventType, string eventMetadata, string eventData,
92100
bool isJson = false) {
93101
List<EventRecord> list;
94102
if (!_streams.TryGetValue(streamName, out list) || list == null) {
@@ -112,7 +120,7 @@ protected TFPos ExistingEvent(string streamName, string eventType, string eventM
112120
var eventPosition = new TFPos(_fakePosition + 50, _fakePosition);
113121
_all.Add(eventPosition, eventRecord);
114122
_fakePosition += 100;
115-
return eventPosition;
123+
return (eventRecord, eventPosition);
116124
}
117125

118126
protected void NotReady() {
@@ -209,6 +217,9 @@ public void setup1() {
209217
_bus.Subscribe<ClientMessage.ReadStreamEventsBackward>(this);
210218
_bus.Subscribe<ClientMessage.ReadStreamEventsForward>(this);
211219
_bus.Subscribe<ClientMessage.ReadAllEventsForward>(this);
220+
_bus.Subscribe<ClientMessage.FilteredReadAllEventsForward>(this);
221+
_bus.Subscribe<ClientMessage.SubscribeToStream>(this);
222+
_bus.Subscribe<ClientMessage.FilteredSubscribeToStream>(this);
212223
_bus.Subscribe<ClientMessage.DeleteStream>(this);
213224
_bus.Subscribe<ClientMessage.TransactionStart>(this);
214225
_bus.Subscribe<ClientMessage.TransactionWrite>(this);
@@ -521,6 +532,52 @@ public void Handle(ClientMessage.ReadAllEventsForward message) {
521532
prev,
522533
_fakePosition));
523534
}
535+
536+
public void Handle(ClientMessage.FilteredReadAllEventsForward message) {
537+
if (_readsTimeOut) return;
538+
if (!_readAllEnabled)
539+
return;
540+
var from = new TFPos(message.CommitPosition, message.PreparePosition);
541+
var records = _all.SkipWhile(v => v.Key < from).Where(kvp => message.EventFilter.IsEventAllowed(kvp.Value)).Take(message.MaxCount).ToArray();
542+
var list = new List<ResolvedEvent>();
543+
var pos = from;
544+
var next = pos;
545+
var prev = new TFPos(pos.CommitPosition, Int64.MaxValue);
546+
foreach (KeyValuePair<TFPos, EventRecord> record in records) {
547+
pos = record.Key;
548+
next = new TFPos(pos.CommitPosition, pos.PreparePosition + 1);
549+
list.Add(BuildEvent(record.Value, message.ResolveLinkTos, record.Key.CommitPosition));
550+
}
551+
552+
var events = list.ToArray();
553+
message.Envelope.ReplyWith(
554+
new ClientMessage.FilteredReadAllEventsForwardCompleted(
555+
message.CorrelationId, FilteredReadAllResult.Success, "", events, null, false, message.MaxCount, pos, next,
556+
prev, _fakePosition, list.Count < message.MaxCount, -1));
557+
}
558+
559+
public void Handle(ClientMessage.SubscribeToStream msg) {
560+
_streams.TryGetValue(msg.EventStreamId, out var list);
561+
562+
var lastEventNumber = msg.EventStreamId.IsEmptyString()
563+
? (long?)null
564+
: list.Safe().Any() ? list.Safe().Last().EventNumber : -1;
565+
var subscribedMessage =
566+
new ClientMessage.SubscriptionConfirmation(msg.CorrelationId, -1, lastEventNumber);
567+
msg.Envelope.ReplyWith(subscribedMessage);
568+
}
569+
570+
public void Handle(ClientMessage.FilteredSubscribeToStream msg) {
571+
_streams.TryGetValue(msg.EventStreamId, out var list);
572+
573+
var lastEventNumber = msg.EventStreamId.IsEmptyString()
574+
? (long?)null
575+
: list.Safe().Any() ? list.Safe().Last().EventNumber : -1;
576+
var lastCommitPos = -1;
577+
var subscribedMessage =
578+
new ClientMessage.SubscriptionConfirmation(msg.CorrelationId, lastCommitPos, lastEventNumber);
579+
msg.Envelope.ReplyWith(subscribedMessage);
580+
}
524581

525582
public void Handle(ClientMessage.TransactionStart message) {
526583
var transactionId = _fakePosition;

src/EventStore.Core.Tests/Helpers/TestFixtureWithReadWriteDispatchers.cs

+7
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
using EventStore.Core.Tests.Services.TimeService;
1111
using NUnit.Framework;
1212
using System.Linq;
13+
using EventStore.Core.Telemetry;
1314

1415
namespace EventStore.Core.Tests.Helpers {
1516
public abstract class TestFixtureWithReadWriteDispatchers {
1617
protected InMemoryBus _bus;
18+
protected IQueuedHandler _publisher;
1719

1820
protected RequestResponseDispatcher<ClientMessage.DeleteStream, ClientMessage.DeleteStreamCompleted>
1921
_streamDispatcher;
@@ -47,6 +49,11 @@ public void setup0() {
4749
_envelope = null;
4850
_timeProvider = new FakeTimeProvider();
4951
_bus = new InMemoryBus("bus");
52+
_publisher = QueuedHandler.CreateQueuedHandler(_bus,
53+
"TestQueue",
54+
new QueueStatsManager(),
55+
new QueueTrackers(), watchSlowMsg: false);
56+
_publisher.Start();
5057
_consumer = new TestHandler<Message>();
5158
_bus.Subscribe(_consumer);
5259
_queue = GiveInputQueue();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using EventStore.Common.Utils;
5+
using EventStore.Core.Services.Storage.ReaderIndex;
6+
using EventStore.Core.Services.Transport.Grpc;
7+
using EventStore.Core.Tests.Helpers;
8+
using NUnit.Framework;
9+
10+
namespace EventStore.Core.Tests.Services.Transport.Grpc;
11+
12+
[TestFixture]
13+
public class EnumeratorsTests {
14+
15+
[TestFixture(typeof(LogFormat.V2), typeof(string))]
16+
[TestFixture(typeof(LogFormat.V3), typeof(uint))]
17+
public class subscribe_all_from_start<TLogFormat, TStreamId> : TestFixtureWithExistingEvents<TLogFormat, TStreamId> {
18+
19+
private readonly List<Guid> _eventIds = new();
20+
21+
protected override void Given() {
22+
EnableReadAll();
23+
_eventIds.Add(WriteEvent("test-stream", "type1", "{}", "{Data: 1}").Item1.EventId);
24+
_eventIds.Add(WriteEvent("test-stream", "type2", "{}", "{Data: 2}").Item1.EventId);
25+
_eventIds.Add(WriteEvent("test-stream", "type3", "{}", "{Data: 3}").Item1.EventId);
26+
}
27+
28+
[Test]
29+
public async Task should_receive_live_caught_up_message_after_reading_existing_events() {
30+
var enumerator = new TestEnumerators.AllSubscription<TStreamId>(new TestEnumerators.SubscriptionRequest(_publisher, Position.Start));
31+
32+
TestEnumerators.SubscriptionResponse response = await enumerator.GetNext();
33+
Assert.True(response is TestEnumerators.SubscriptionConfirmation);
34+
while ((response = await enumerator.GetNext()) != TestEnumerators.SubscriptionResponse.None) {
35+
if (_eventIds.IsNotEmpty()) {
36+
Assert.AreEqual(Uuid.FromGuid(_eventIds[0]), ((TestEnumerators.Event)response).RecordedEvent.Id);
37+
_eventIds.RemoveAt(0);
38+
} else {
39+
Assert.True(response is TestEnumerators.CaughtUp);
40+
break;
41+
}
42+
}
43+
}
44+
}
45+
46+
[TestFixture(typeof(LogFormat.V2), typeof(string))]
47+
[TestFixture(typeof(LogFormat.V3), typeof(uint))]
48+
public class subscribe_all_from_end<TLogFormat, TStreamId> : TestFixtureWithExistingEvents<TLogFormat, TStreamId> {
49+
50+
protected override void Given() {
51+
EnableReadAll();
52+
WriteEvent("test-stream", "type1", "{}", "{Data: 1}");
53+
WriteEvent("test-stream", "type2", "{}", "{Data: 2}");
54+
WriteEvent("test-stream", "type3", "{}", "{Data: 3}");
55+
}
56+
57+
[Test]
58+
public async Task should_receive_live_caught_up_message_immediately() {
59+
var enumerator = new TestEnumerators.AllSubscription<TStreamId>(new TestEnumerators.SubscriptionRequest(_publisher, Position.End));
60+
61+
Assert.True(await enumerator.GetNext() is TestEnumerators.SubscriptionConfirmation);
62+
Assert.True(await enumerator.GetNext() is TestEnumerators.CaughtUp);
63+
}
64+
}
65+
66+
[TestFixture(typeof(LogFormat.V2), typeof(string))]
67+
[TestFixture(typeof(LogFormat.V3), typeof(uint))]
68+
public class subscribe_filtered_all_from_start_<TLogFormat, TStreamId> : TestFixtureWithExistingEvents<TLogFormat, TStreamId> {
69+
70+
private readonly List<Guid> _eventIds = new();
71+
72+
protected override void Given() {
73+
EnableReadAll();
74+
_eventIds.Add(WriteEvent("test-stream", "type1", "{}", "{Data: 1}").Item1.EventId);
75+
WriteEvent("test-stream", "type2", "{}", "{Data: 2}");
76+
WriteEvent("test-stream", "type3", "{}", "{Data: 3}");
77+
}
78+
79+
[Test]
80+
public async Task should_receive_live_caught_up_message_after_reading_existing_events() {
81+
var enumerator = new TestEnumerators.AllSubscriptionFiltered<TStreamId>(new TestEnumerators.SubscriptionRequest(_publisher, Position.Start, EventFilter: EventFilter.EventType.Prefixes(false, "type1")));
82+
83+
TestEnumerators.SubscriptionResponse response = await enumerator.GetNext();
84+
Assert.True(response is TestEnumerators.SubscriptionConfirmation);
85+
while ((response = await enumerator.GetNext()) != TestEnumerators.SubscriptionResponse.None) {
86+
if (_eventIds.IsNotEmpty()) {
87+
Assert.AreEqual(Uuid.FromGuid(_eventIds[0]), ((TestEnumerators.Event)response).RecordedEvent.Id);
88+
_eventIds.RemoveAt(0);
89+
} else {
90+
Assert.True(response is TestEnumerators.CaughtUp);
91+
break;
92+
}
93+
}
94+
}
95+
}
96+
97+
[TestFixture(typeof(LogFormat.V2), typeof(string))]
98+
[TestFixture(typeof(LogFormat.V3), typeof(uint))]
99+
public class subscribe_filtered_all_from_end_<TLogFormat, TStreamId> : TestFixtureWithExistingEvents<TLogFormat, TStreamId> {
100+
101+
protected override void Given() {
102+
EnableReadAll();
103+
WriteEvent("test-stream", "type1", "{}", "{Data: 1}");
104+
WriteEvent("test-stream", "type2", "{}", "{Data: 2}");
105+
WriteEvent("test-stream", "type3", "{}", "{Data: 3}");
106+
}
107+
108+
[Test]
109+
public async Task should_receive_live_caught_up_message_immediately() {
110+
var enumerator = new TestEnumerators.AllSubscriptionFiltered<TStreamId>(new TestEnumerators.SubscriptionRequest(_publisher, Position.End, EventFilter: EventFilter.EventType.Prefixes(false, "type1")));
111+
112+
Assert.True(await enumerator.GetNext() is TestEnumerators.SubscriptionConfirmation);
113+
Assert.True(await enumerator.GetNext() is TestEnumerators.CaughtUp);
114+
}
115+
}
116+
117+
[TestFixture(typeof(LogFormat.V2), typeof(string))]
118+
[TestFixture(typeof(LogFormat.V3), typeof(uint))]
119+
public class subscribe_stream_from_start_<TLogFormat, TStreamId> : TestFixtureWithExistingEvents<TLogFormat, TStreamId> {
120+
121+
private readonly List<Guid> _eventIds = new();
122+
123+
protected override void Given() {
124+
EnableReadAll();
125+
_eventIds.Add(WriteEvent("test-stream1", "type1", "{}", "{Data: 1}").Item1.EventId);
126+
WriteEvent("test-stream2", "type2", "{}", "{Data: 2}");
127+
WriteEvent("test-stream3", "type3", "{}", "{Data: 3}");
128+
}
129+
130+
[Test]
131+
public async Task should_receive_live_caught_up_message_after_reading_existing_events() {
132+
var enumerator = new TestEnumerators.StreamSubscription<TStreamId>(new TestEnumerators.SubscriptionRequest(_publisher, StreamName: "test-stream1"));
133+
134+
TestEnumerators.SubscriptionResponse response = await enumerator.GetNext();
135+
Assert.True(response is TestEnumerators.SubscriptionConfirmation);
136+
while ((response = await enumerator.GetNext()) != TestEnumerators.SubscriptionResponse.None) {
137+
if (_eventIds.IsNotEmpty()) {
138+
Assert.AreEqual(Uuid.FromGuid(_eventIds[0]), ((TestEnumerators.Event)response).RecordedEvent.Id);
139+
_eventIds.RemoveAt(0);
140+
} else {
141+
Assert.True(response is TestEnumerators.CaughtUp);
142+
break;
143+
}
144+
}
145+
}
146+
}
147+
148+
[TestFixture(typeof(LogFormat.V2), typeof(string))]
149+
[TestFixture(typeof(LogFormat.V3), typeof(uint))]
150+
public class subscribe_stream_from_end<TLogFormat, TStreamId> : TestFixtureWithExistingEvents<TLogFormat, TStreamId> {
151+
152+
private readonly List<Guid> _eventIds = new();
153+
154+
protected override void Given() {
155+
EnableReadAll();
156+
_eventIds.Add(WriteEvent("test-stream1", "type1", "{}", "{Data: 1}").Item1.EventId);
157+
WriteEvent("test-stream2", "type2", "{}", "{Data: 2}");
158+
WriteEvent("test-stream3", "type3", "{}", "{Data: 3}");
159+
}
160+
161+
[Test]
162+
public async Task should_receive_live_caught_up_message_immediately() {
163+
var enumerator = new TestEnumerators.StreamSubscription<TStreamId>(new TestEnumerators.SubscriptionRequest(_publisher, StreamName: "test-stream1", StartRevision: StreamRevision.End));
164+
165+
Assert.True(await enumerator.GetNext() is TestEnumerators.SubscriptionConfirmation);
166+
Assert.True(await enumerator.GetNext() is TestEnumerators.CaughtUp);
167+
}
168+
}
169+
}

src/EventStore.Core/Services/Transport/Grpc/Enumerators.AllSubscription.cs

+4-10
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private void Subscribe(Position? startPosition) {
122122
if (startPosition == Position.End) {
123123
GoLive(Position.End);
124124
}
125-
else if (startPosition == null) {
125+
else if (startPosition == null || startPosition == Position.Start) {
126126
CatchUp(Position.Start);
127127
} else {
128128
var (commitPosition, preparePosition) = startPosition.Value.ToInt64();
@@ -157,11 +157,8 @@ async Task OnMessage(Message message, CancellationToken ct) {
157157
case ReadAllResult.Success:
158158
await ConfirmSubscription().ConfigureAwait(false);
159159

160-
var position = Position.FromInt64(completed.CurrentPos.CommitPosition,
161-
completed.CurrentPos.PreparePosition);
162-
163160
foreach (var @event in completed.Events) {
164-
position = Position.FromInt64(
161+
var position = Position.FromInt64(
165162
@event.OriginalPosition.Value.CommitPosition,
166163
@event.OriginalPosition.Value.PreparePosition);
167164

@@ -217,13 +214,10 @@ private void GoLive(Position startPosition) {
217214
Task.Factory.StartNew(PumpLiveMessages, _cancellationToken);
218215

219216
async Task PumpLiveMessages() {
220-
var position = await caughtUpSource.Task.ConfigureAwait(false);
217+
await caughtUpSource.Task.ConfigureAwait(false);
221218

222219
await _channel.Writer.WriteAsync(new ReadResp {
223-
Checkpoint = new ReadResp.Types.Checkpoint {
224-
CommitPosition = position.CommitPosition,
225-
PreparePosition = position.PreparePosition
226-
}
220+
CaughtUp = new ReadResp.Types.CaughtUp()
227221
}, _cancellationToken).ConfigureAwait(false);
228222

229223
await foreach (var @event in liveEvents.Reader.ReadAllAsync(_cancellationToken)

src/EventStore.Core/Services/Transport/Grpc/Enumerators.AllSubscriptionFiltered.cs

+3-6
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private void Subscribe(Position? startPosition) {
145145
if (startPosition == Position.End) {
146146
GoLive(Position.End);
147147
}
148-
else if (startPosition == null) {
148+
else if (startPosition == null || startPosition == Position.Start) {
149149
CatchUp(Position.Start);
150150
} else {
151151
var (commitPosition, preparePosition) = startPosition.Value.ToInt64();
@@ -263,13 +263,10 @@ private void GoLive(Position startPosition) {
263263
Task.Factory.StartNew(PumpLiveMessages, _cancellationToken);
264264

265265
async Task PumpLiveMessages() {
266-
var position = await caughtUpSource.Task.ConfigureAwait(false);
266+
await caughtUpSource.Task.ConfigureAwait(false);
267267

268268
await _channel.Writer.WriteAsync(new ReadResp {
269-
Checkpoint = new ReadResp.Types.Checkpoint {
270-
CommitPosition = position.CommitPosition,
271-
PreparePosition = position.PreparePosition
272-
}
269+
CaughtUp = new ReadResp.Types.CaughtUp()
273270
}, _cancellationToken).ConfigureAwait(false);
274271

275272
await foreach (var message in liveEvents.Reader.ReadAllAsync(_cancellationToken)

src/EventStore.Core/Services/Transport/Grpc/Enumerators.StreamSubscription.cs

+5
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ private void GoLive(StreamRevision startRevision) {
222222

223223
async Task PumpLiveMessages() {
224224
await caughtUpSource.Task.ConfigureAwait(false);
225+
226+
await _channel.Writer.WriteAsync(new ReadResp {
227+
CaughtUp = new ReadResp.Types.CaughtUp()
228+
}, _cancellationToken).ConfigureAwait(false);
229+
225230
await foreach (var @event in liveEvents.Reader.ReadAllAsync(_cancellationToken)
226231
.ConfigureAwait(false)) {
227232
await _channel.Writer.WriteAsync(new ReadResp {

0 commit comments

Comments
 (0)