Skip to content

Commit b1daa66

Browse files
committed
Revert changes on EventRead, add legacy data read test
1 parent 3cf2eee commit b1daa66

File tree

4 files changed

+53
-12
lines changed

4 files changed

+53
-12
lines changed

src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs

+8-4
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ public class QueryConfiguration
181181
/// <summary>
182182
/// The default serializer used when not type override matching is found
183183
/// </summary>
184-
[Obsolete(message: "This property will always return null")]
185-
public string DefaultSerializer => null;
184+
[Obsolete(message: "This property should never be used, use the default `System.Object` serializer instead")]
185+
public string DefaultSerializer { get; }
186186

187187
/// <summary>
188188
/// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.
@@ -221,7 +221,7 @@ public QueryConfiguration(
221221
string orderingColumnName,
222222
string serializerIdColumnName,
223223
TimeSpan timeout,
224-
string defaultSerializer, // This is being ignored now
224+
string defaultSerializer,
225225
bool useSequentialAccess)
226226
{
227227
SchemaName = schemaName;
@@ -236,6 +236,7 @@ public QueryConfiguration(
236236
Timeout = timeout;
237237
TagsColumnName = tagsColumnName;
238238
OrderingColumnName = orderingColumnName;
239+
DefaultSerializer = defaultSerializer;
239240
SerializerIdColumnName = serializerIdColumnName;
240241
UseSequentialAccess = useSequentialAccess;
241242
}
@@ -846,7 +847,10 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
846847
{
847848
// Support old writes that did not set the serializer id
848849
var type = Type.GetType(manifest, true);
849-
var deserializer = Serialization.FindSerializerForType(type);
850+
#pragma warning disable CS0618
851+
// Backward compatibility code, we still need to use the old default serializer on read to support legacy data
852+
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
853+
#pragma warning restore CS0618
850854
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
851855
deserialized = Akka.Serialization.Serialization.WithTransport(
852856
Serialization.System, (deserializer, (byte[])payload, type),

src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Akka.Persistence.Sqlite.Tests.csproj

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@
2727
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
2828
</ItemGroup>
2929

30+
<ItemGroup>
31+
<None Update="data\Sqlite.CustomObject.db">
32+
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
33+
</None>
34+
</ItemGroup>
35+
3036
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
3137
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
3238
</PropertyGroup>

src/contrib/persistence/Akka.Persistence.Sqlite.Tests/CustomObjectSerializerSpec.cs

+39-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,15 @@ namespace Akka.Persistence.Sqlite.Tests
2121
{
2222
public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime
2323
{
24-
private static readonly Config Config = ConfigurationFactory.ParseString($@"
24+
private static readonly string ConnectionString;
25+
private static readonly Config Config;
26+
static CustomObjectSerializerSpec()
27+
{
28+
var filename = $"AkkaSqlite-{Guid.NewGuid()}.db";
29+
File.Copy("./data/Sqlite.CustomObject.db", $"{filename}.db");
30+
31+
ConnectionString = $"DataSource={filename}.db";
32+
Config = ConfigurationFactory.ParseString($@"
2533
akka.actor {{
2634
serializers {{
2735
mySerializer = ""{typeof(MySerializer).AssemblyQualifiedName}""
@@ -35,18 +43,19 @@ public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLif
3543
journal {{
3644
plugin = ""akka.persistence.journal.sqlite""
3745
sqlite {{
38-
connection-string = ""DataSource=AkkaJournal.db""
46+
connection-string = ""{ConnectionString}""
3947
auto-initialize = on
4048
}}
4149
}}
4250
snapshot-store {{
4351
plugin = ""akka.persistence.snapshot-store.sqlite""
4452
sqlite {{
45-
connection-string = ""DataSource=AkkaSnapshot.db""
53+
connection-string = ""{ConnectionString}""
4654
auto-initialize = on
4755
}}
4856
}}
4957
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
58+
}
5059

5160
public CustomObjectSerializerSpec(ITestOutputHelper helper)
5261
: base(Config, nameof(CustomObjectSerializerSpec), helper)
@@ -62,12 +71,13 @@ public async Task CustomSerializerTest()
6271
var serializer = Sys.Serialization.FindSerializerForType(typeof(Persisted));
6372
serializer.Should().BeOfType<MySerializer>();
6473

65-
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a")));
74+
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a", probe)));
75+
probe.ExpectMsg("recovered");
6676
actor.Tell(new Persisted("a"), probe);
6777
probe.ExpectMsg(new Persisted("a"));
6878

6979
// Read the database directly, make sure that we're using the correct object type serializer
70-
var conn = new SqliteConnection("DataSource=AkkaJournal.db");
80+
var conn = new SqliteConnection(ConnectionString);
7181
conn.Open();
7282
const string sql = "SELECT ej.serializer_id FROM event_journal ej WHERE ej.persistence_id = 'a'";
7383
await using var cmd = new SqliteCommand(sql, conn);
@@ -78,10 +88,19 @@ public async Task CustomSerializerTest()
7888
record[0].Should().Be(9999);
7989
}
8090

91+
[Fact(DisplayName = "Persistence.Sql should be able to read legacy data")]
92+
public void LegacyDataTest()
93+
{
94+
var probe = CreateTestProbe();
95+
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("old", probe)));
96+
probe.ExpectMsg(new Persisted("old"));
97+
probe.ExpectMsg("recovered");
98+
}
99+
81100
public Task InitializeAsync()
82101
{
83-
if(File.Exists("AkkaJournal.db"))
84-
File.Delete("AkkaJournal.db");
102+
if(File.Exists("AkkaSqlite.db"))
103+
File.Delete("AkkaSqlite.db");
85104
return Task.CompletedTask;
86105
}
87106

@@ -140,9 +159,12 @@ public override object FromBinary(byte[] bytes, Type type)
140159

141160
internal sealed class PersistedActor : UntypedPersistentActor
142161
{
143-
public PersistedActor(string persistenceId)
162+
private readonly IActorRef _probe;
163+
164+
public PersistedActor(string persistenceId, IActorRef probe)
144165
{
145166
PersistenceId = persistenceId;
167+
_probe = probe;
146168
}
147169

148170
public override string PersistenceId { get; }
@@ -158,6 +180,15 @@ protected override void OnCommand(object message)
158180

159181
protected override void OnRecover(object message)
160182
{
183+
switch (message)
184+
{
185+
case Persisted msg:
186+
_probe.Tell(msg);
187+
break;
188+
case RecoveryCompleted _:
189+
_probe.Tell("recovered");
190+
break;
191+
}
161192
}
162193
}
163194
}

0 commit comments

Comments
 (0)