Skip to content

Commit 2ffcd66

Browse files
Add EventEnvelope Tags support (#426)
* Add EventEnvelope Tags support * Enable unit test Tags support * Refactor .ToArray() calls --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 8468332 commit 2ffcd66

10 files changed

+39
-27
lines changed

src/Akka.Persistence.Sql.Tests.Common/Query/BaseCurrentEventsByTagSpec.cs

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ protected BaseCurrentEventsByTagSpec(TagMode tagMode, ITestOutputHelper output,
2727
: base(Config(tagMode, fixture), name, output)
2828
=> ReadJournal = Sys.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
2929

30+
protected override bool SupportsTagsInEventEnvelope => true;
31+
3032
public async Task InitializeAsync()
3133
{
3234
// Force start read journal

src/Akka.Persistence.Sql.Tests.Common/Query/BaseEventsByTagSpec.cs

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ protected BaseEventsByTagSpec(TagMode tagMode, ITestOutputHelper output, string
2727
: base(Config(tagMode, fixture), name, output)
2828
=> ReadJournal = Sys.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
2929

30+
protected override bool SupportsTagsInEventEnvelope => true;
31+
3032
public async Task InitializeAsync()
3133
{
3234
// Force start read journal

src/Akka.Persistence.Sql/Journal/Dao/BaseByteArrayJournalDao.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ private static IQueryable<long> MaxMarkedForDeletionMaxPersistenceIdQuery(
518518
.Where(r => r.PersistenceId == persistenceId)
519519
.Select(r => LinqToDB.Sql.Ext.Max<long?>(r.SequenceNumber).ToValue()));
520520

521-
private static Func<Util.Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, Util.Try<ReplayCompletion>> MessageWithBatchMapper()
521+
private static Func<Util.Try<(IPersistentRepresentation, string[], long)>, Util.Try<ReplayCompletion>> MessageWithBatchMapper()
522522
=> x => x.IsSuccess
523523
? new Util.Try<ReplayCompletion>(new ReplayCompletion(x.Success.Value))
524524
: new Util.Try<ReplayCompletion>(x.Failure.Value);

src/Akka.Persistence.Sql/Journal/Dao/ByteArrayJournalSerializer.cs

+6-9
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,15 @@ protected override Try<JournalRow> Serialize(
138138
}
139139
}
140140

141-
protected override Try<(IPersistentRepresentation, IImmutableSet<string>, long)> Deserialize(JournalRow t)
141+
protected override Try<(IPersistentRepresentation, string[], long)> Deserialize(JournalRow t)
142142
{
143143
try
144144
{
145145
var identifierMaybe = t.Identifier;
146146
if (identifierMaybe.HasValue)
147147
{
148148
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
149-
return new Try<(IPersistentRepresentation, IImmutableSet<string>, long)>(
149+
return new Try<(IPersistentRepresentation, string[], long)>(
150150
(
151151
new Persistent(
152152
payload: _serializer.Deserialize(t.Message, identifierMaybe.Value, t.Manifest),
@@ -157,16 +157,14 @@ protected override Try<JournalRow> Serialize(
157157
sender: ActorRefs.NoSender,
158158
writerGuid: t.WriterUuid,
159159
timestamp: t.Timestamp),
160-
t.Tags?
161-
.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries)
162-
.ToImmutableHashSet() ?? t.TagArray?.ToImmutableHashSet() ?? ImmutableHashSet<string>.Empty,
160+
t.Tags?.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) ?? t.TagArray ?? Array.Empty<string>(),
163161
t.Ordering));
164162
}
165163

166164
var type = Type.GetType(t.Manifest, true);
167165

168166
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
169-
return new Try<(IPersistentRepresentation, IImmutableSet<string>, long)>(
167+
return new Try<(IPersistentRepresentation, string[], long)>(
170168
(
171169
new Persistent(
172170
payload: Akka.Serialization.Serialization.WithTransport(
@@ -181,13 +179,12 @@ protected override Try<JournalRow> Serialize(
181179
writerGuid: t.WriterUuid,
182180
timestamp: t.Timestamp),
183181
t.Tags?
184-
.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries)
185-
.ToImmutableHashSet() ?? t.TagArray?.ToImmutableHashSet() ?? ImmutableHashSet<string>.Empty,
182+
.Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) ?? t.TagArray ?? Array.Empty<string>(),
186183
t.Ordering));
187184
}
188185
catch (Exception e)
189186
{
190-
return new Try<(IPersistentRepresentation, IImmutableSet<string>, long)>(e);
187+
return new Try<(IPersistentRepresentation, string[], long)>(e);
191188
}
192189
}
193190
}

src/Akka.Persistence.Sql/Journal/Types/ReplayCompletion.cs

+10-2
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,32 @@
55
// -----------------------------------------------------------------------
66

77
using System.Collections.Immutable;
8+
using Akka.Annotations;
89

910
namespace Akka.Persistence.Sql.Journal.Types
1011
{
12+
[InternalApi]
1113
public sealed class ReplayCompletion
1214
{
1315
public readonly long Ordering;
1416

1517
public readonly IPersistentRepresentation Representation;
1618

19+
public readonly string[] Tags;
20+
1721
public ReplayCompletion(
1822
IPersistentRepresentation representation,
23+
string[] tags,
1924
long ordering)
2025
{
2126
Representation = representation;
2227
Ordering = ordering;
28+
Tags = tags;
2329
}
2430

25-
public ReplayCompletion((IPersistentRepresentation, IImmutableSet<string>, long) success)
26-
=> (Representation, _, Ordering) = success;
31+
public ReplayCompletion((IPersistentRepresentation, string[], long) success)
32+
{
33+
(Representation, Tags, Ordering) = success;
34+
}
2735
}
2836
}

src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace Akka.Persistence.Sql.Query.Dao
2828
{
2929
public abstract class BaseByteReadArrayJournalDao : BaseJournalDaoWithReadMessages, IReadJournalDao
3030
{
31-
private readonly Flow<JournalRow, Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> _deserializeFlow;
31+
private readonly Flow<JournalRow, Try<(IPersistentRepresentation, string[], long)>, NotUsed> _deserializeFlow;
3232

3333
private readonly ReadJournalConfig _readJournalConfig;
3434
private readonly DbStateHolder _dbStateHolder;
@@ -70,7 +70,7 @@ public Source<string, NotUsed> AllPersistenceIdsSource(long max)
7070
});
7171
}
7272

73-
public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> EventsByTag(
73+
public Source<Try<(IPersistentRepresentation, string[], long)>, NotUsed> EventsByTag(
7474
string tag,
7575
long offset,
7676
long maxOffset,
@@ -170,8 +170,8 @@ public override Task<Source<Try<ReplayCompletion>, NotUsed>> Messages(
170170
{
171171
try
172172
{
173-
var (representation, _, ordering) = t.Get();
174-
return new Try<ReplayCompletion>(new ReplayCompletion(representation, ordering));
173+
var (representation, tags, ordering) = t.Get();
174+
return new Try<ReplayCompletion>(new ReplayCompletion(representation, tags, ordering));
175175
}
176176
catch (Exception e)
177177
{
@@ -225,7 +225,7 @@ private static int MaxTake(long max)
225225
: (int)max;
226226

227227

228-
public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> Events(
228+
public Source<Try<(IPersistentRepresentation, string[], long)>, NotUsed> Events(
229229
long offset,
230230
long maxOffset,
231231
long max)
@@ -318,7 +318,7 @@ private static async Task<List<JournalRow>> AddTagDataFromTagTableAsync(IQueryab
318318
var result = new List<JournalRow>();
319319
foreach (var rowAndTags in rowsAndTags)
320320
{
321-
rowAndTags.row.TagArray = rowAndTags.tags?.Split(';') ?? Array.Empty<string>();
321+
rowAndTags.row.TagArray = rowAndTags.tags?.Split([';'], StringSplitOptions.RemoveEmptyEntries) ?? Array.Empty<string>();
322322
result.Add(rowAndTags.row);
323323
}
324324

src/Akka.Persistence.Sql/Query/IReadJournalDao.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface IReadJournalDao : IJournalDaoWithReadMessages
1717
Source<string, NotUsed> AllPersistenceIdsSource(
1818
long max);
1919

20-
Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> EventsByTag(
20+
Source<Try<(IPersistentRepresentation, string[], long)>, NotUsed> EventsByTag(
2121
string tag,
2222
long offset,
2323
long maxOffset,

src/Akka.Persistence.Sql/Query/SqlReadJournal.cs

+9-6
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,16 @@ private Source<EventEnvelope, NotUsed> EventsByPersistenceIdSource(
190190
=> _readJournalDao
191191
.MessagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, _readJournalConfig.MaxBufferSize, refreshInterval)
192192
.SelectAsync(1, representationAndOrdering => Task.FromResult(representationAndOrdering.Get()))
193-
.SelectMany(r => AdaptEvents(r.Representation).Select(_ => new { representation = r.Representation, ordering = r.Ordering }))
193+
.SelectMany(r => AdaptEvents(r.Representation).Select(_ => new { representation = r.Representation, ordering = r.Ordering, tags = r.Tags}))
194194
.Select(
195195
r =>
196196
new EventEnvelope(
197197
offset: new Sequence(r.ordering),
198198
persistenceId: r.representation.PersistenceId,
199199
sequenceNr: r.representation.SequenceNr,
200200
@event: r.representation.Payload,
201-
timestamp: r.representation.Timestamp));
201+
timestamp: r.representation.Timestamp,
202+
tags: r.tags));
202203

203204
private Source<EventEnvelope, NotUsed> CurrentJournalEvents(long offset, long max, MaxOrderingId latestOrdering)
204205
{
@@ -211,7 +212,7 @@ private Source<EventEnvelope, NotUsed> CurrentJournalEvents(long offset, long ma
211212
.SelectMany(
212213
a =>
213214
{
214-
var (representation, _, ordering) = a;
215+
var (representation, tags, ordering) = a;
215216
return AdaptEvents(representation)
216217
.Select(
217218
r =>
@@ -220,7 +221,8 @@ private Source<EventEnvelope, NotUsed> CurrentJournalEvents(long offset, long ma
220221
persistenceId: r.PersistenceId,
221222
sequenceNr: r.SequenceNr,
222223
@event: r.Payload,
223-
timestamp: r.Timestamp));
224+
timestamp: r.Timestamp,
225+
tags: tags));
224226
});
225227
}
226228

@@ -239,7 +241,7 @@ private Source<EventEnvelope, NotUsed> CurrentJournalEventsByTag(
239241
.SelectMany(
240242
a =>
241243
{
242-
var (representation, _, ordering) = a;
244+
var (representation, tags, ordering) = a;
243245
return AdaptEvents(representation)
244246
.Select(
245247
r =>
@@ -248,7 +250,8 @@ private Source<EventEnvelope, NotUsed> CurrentJournalEventsByTag(
248250
persistenceId: r.PersistenceId,
249251
sequenceNr: r.SequenceNr,
250252
@event: r.Payload,
251-
timestamp: r.Timestamp));
253+
timestamp: r.Timestamp,
254+
tags: tags));
252255
});
253256
}
254257

src/Akka.Persistence.Sql/Serialization/FlowPersistentRepresentationSerializer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace Akka.Persistence.Sql.Serialization
1212
{
1313
public abstract class FlowPersistentRepresentationSerializer<T> : PersistentRepresentationSerializer<T>
1414
{
15-
public Flow<T, Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> DeserializeFlow()
15+
public Flow<T, Try<(IPersistentRepresentation, string[], long)>, NotUsed> DeserializeFlow()
1616
=> Flow.Create<T, NotUsed>().Select(Deserialize);
1717
}
1818
}

src/Akka.Persistence.Sql/Serialization/PersistentRepresentationSerializer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,6 @@ protected abstract Try<T> Serialize(
111111
IImmutableSet<string> tTags,
112112
long timeStamp = 0);
113113

114-
protected abstract Try<(IPersistentRepresentation, IImmutableSet<string>, long)> Deserialize(T t);
114+
protected abstract Try<(IPersistentRepresentation, string[], long)> Deserialize(T t);
115115
}
116116
}

0 commit comments

Comments
 (0)