Skip to content

Commit 7ca22af

Browse files
authored
Fix broken object type serializer in QueryExecutor (#6528)
* Reproduction unit test for akkadotnet/Akka.Hosting#127 (cherry picked from commit 75e24b3) * Clarify the unit test (cherry picked from commit a0b5e8f) * Remove persistence default serializer feature (cherry picked from commit 3cf2eee) * Revert changes on EventRead, add legacy data read test (cherry picked from commit b1daa66) * Revert persistence.conf changes, modify sqlite database to mimic legacy data (cherry picked from commit f3fd0e9) * Resolve conflicts * Update API Verify list * Add legacy data test * Add sqlite data and fix tests
1 parent f7c30f7 commit 7ca22af

20 files changed

+511
-53
lines changed

src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ public abstract class BatchingSqlJournalSetup
234234
/// <summary>
235235
/// The default serializer used when not type override matching is found
236236
/// </summary>
237+
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
237238
public string DefaultSerializer { get; }
238239

239240
/// <summary>
@@ -1228,7 +1229,7 @@ private async Task<WriteMessagesResult> HandleWriteMessages(WriteMessages req, T
12281229
protected virtual void WriteEvent(TCommand command, IPersistentRepresentation persistent, string tags = "")
12291230
{
12301231
var payloadType = persistent.Payload.GetType();
1231-
var serializer = _serialization.FindSerializerForType(payloadType, Setup.DefaultSerializer);
1232+
var serializer = _serialization.FindSerializerForType(payloadType);
12321233

12331234
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
12341235
Akka.Serialization.Serialization.WithTransport(_serialization.System, () =>

src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ public class QueryConfiguration
181181
/// <summary>
182182
/// The default serializer used when not type override matching is found
183183
/// </summary>
184+
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
184185
public string DefaultSerializer { get; }
185186

186187
/// <summary>
@@ -780,7 +781,7 @@ protected DbCommand GetCommand(DbConnection connection, string sql)
780781
protected virtual void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
781782
{
782783

783-
var serializer = Serialization.FindSerializerForType(e.Payload.GetType(), Configuration.DefaultSerializer);
784+
var serializer = Serialization.FindSerializerForType(e.Payload.GetType());
784785

785786
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
786787
var (binary,manifest) = Akka.Serialization.Serialization.WithTransport(Serialization.System,(e.Payload,serializer) ,(state) =>
@@ -846,7 +847,10 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
846847
{
847848
// Support old writes that did not set the serializer id
848849
var type = Type.GetType(manifest, true);
850+
#pragma warning disable CS0618
851+
// Backward compatibility code, we still need to use the old default serializer on read to support legacy data
849852
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
853+
#pragma warning restore CS0618
850854
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
851855
deserialized = Akka.Serialization.Serialization.WithTransport(
852856
Serialization.System, (deserializer, (byte[])payload, type),

src/contrib/persistence/Akka.Persistence.Sql.Common/Settings.cs

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public class SnapshotStoreSettings
117117
/// <summary>
118118
/// The default serializer being used if no type match override is specified
119119
/// </summary>
120+
[Obsolete(message: "This property should never be used, use the default `System.Object` serializer instead")]
120121
public string DefaultSerializer { get; private set; }
121122

122123
/// <summary>

src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public class QueryConfiguration
110110
/// <summary>
111111
/// The default serializer used when not type override matching is found
112112
/// </summary>
113+
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
113114
public readonly string DefaultSerializer;
114115

115116
/// <summary>
@@ -336,7 +337,7 @@ DELETE FROM {Configuration.FullSnapshotTableName}
336337
protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
337338
{
338339
var snapshotType = snapshot.GetType();
339-
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
340+
var serializer = Serialization.FindSerializerForType(snapshotType);
340341
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
341342
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(snapshot));
342343
AddParameter(command, "@Payload", DbType.Binary, binary);
@@ -350,7 +351,7 @@ protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
350351
protected virtual void SetManifestParameters(object snapshot, DbCommand command)
351352
{
352353
var snapshotType = snapshot.GetType();
353-
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
354+
var serializer = Serialization.FindSerializerForType(snapshotType);
354355

355356
string manifest = "";
356357
if (serializer is SerializerWithStringManifest)

src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/SqlSnapshotStore.cs

-17
Original file line numberDiff line numberDiff line change
@@ -224,22 +224,5 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio
224224
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
225225
}
226226
}
227-
228-
private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot)
229-
{
230-
var snapshotType = snapshot.GetType();
231-
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType, _settings.DefaultSerializer);
232-
233-
var binary = Akka.Serialization.Serialization.WithTransport(_actorSystem,
234-
() => serializer.ToBinary(snapshot));
235-
236-
237-
return new SnapshotEntry(
238-
persistenceId: metadata.PersistenceId,
239-
sequenceNr: metadata.SequenceNr,
240-
timestamp: metadata.Timestamp,
241-
manifest: snapshotType.TypeQualifiedName(),
242-
payload: binary);
243-
}
244227
}
245228
}

src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Akka.Persistence.Sqlite.Tests.csproj

+9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@
2727
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
2828
</ItemGroup>
2929

30+
<ItemGroup>
31+
<None Update="data\Sqlite.CustomObject.db">
32+
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
33+
</None>
34+
<None Update="data\Sqlite.v1.3.0.db">
35+
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
36+
</None>
37+
</ItemGroup>
38+
3039
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
3140
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
3241
</PropertyGroup>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="CustomObjectSerializerSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System;
9+
using System.IO;
10+
using System.Text;
11+
using System.Threading.Tasks;
12+
using Akka.Actor;
13+
using Akka.Configuration;
14+
using Akka.Serialization;
15+
using FluentAssertions;
16+
using Microsoft.Data.Sqlite;
17+
using Xunit;
18+
using Xunit.Abstractions;
19+
20+
namespace Akka.Persistence.Sqlite.Tests
21+
{
22+
public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime
23+
{
24+
private static readonly string ConnectionString;
25+
private static readonly Config Config;
26+
static CustomObjectSerializerSpec()
27+
{
28+
var filename = $"AkkaSqlite-{Guid.NewGuid()}.db";
29+
File.Copy("./data/Sqlite.CustomObject.db", $"{filename}.db");
30+
31+
ConnectionString = $"DataSource={filename}.db";
32+
Config = ConfigurationFactory.ParseString($@"
33+
akka.actor {{
34+
serializers {{
35+
mySerializer = ""{typeof(MySerializer).AssemblyQualifiedName}""
36+
}}
37+
serialization-bindings {{
38+
""System.Object"" = mySerializer
39+
}}
40+
}}
41+
42+
akka.persistence {{
43+
journal {{
44+
plugin = ""akka.persistence.journal.sqlite""
45+
sqlite {{
46+
connection-string = ""{ConnectionString}""
47+
auto-initialize = on
48+
}}
49+
}}
50+
snapshot-store {{
51+
plugin = ""akka.persistence.snapshot-store.sqlite""
52+
sqlite {{
53+
connection-string = ""{ConnectionString}""
54+
auto-initialize = on
55+
}}
56+
}}
57+
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
58+
}
59+
60+
public CustomObjectSerializerSpec(ITestOutputHelper helper)
61+
: base(Config, nameof(CustomObjectSerializerSpec), helper)
62+
{
63+
}
64+
65+
[Fact(DisplayName = "Persistence.Sql should use custom serializer for object type")]
66+
public async Task CustomSerializerTest()
67+
{
68+
var probe = CreateTestProbe();
69+
70+
// Sanity check to see that the system should serialize object type using MySerializer
71+
var serializer = Sys.Serialization.FindSerializerForType(typeof(Persisted));
72+
serializer.Should().BeOfType<MySerializer>();
73+
74+
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a", probe)));
75+
probe.ExpectMsg("recovered");
76+
actor.Tell(new Persisted("a"), probe);
77+
probe.ExpectMsg(new Persisted("a"));
78+
79+
// Read the database directly, make sure that we're using the correct object type serializer
80+
var conn = new SqliteConnection(ConnectionString);
81+
conn.Open();
82+
const string sql = "SELECT ej.serializer_id FROM event_journal ej WHERE ej.persistence_id = 'a'";
83+
await using var cmd = new SqliteCommand(sql, conn);
84+
var record = await cmd.ExecuteReaderAsync();
85+
await record.ReadAsync();
86+
87+
// In the bug this fails, the serializer id is JSON id instead of MySerializer id
88+
record[0].Should().Be(9999);
89+
}
90+
91+
[Fact(DisplayName = "Persistence.Sql should be able to read legacy data")]
92+
public void LegacyDataTest()
93+
{
94+
var probe = CreateTestProbe();
95+
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("old", probe)));
96+
probe.ExpectMsg(new Persisted("old"));
97+
probe.ExpectMsg("recovered");
98+
}
99+
100+
public Task InitializeAsync()
101+
{
102+
if(File.Exists("AkkaSqlite.db"))
103+
File.Delete("AkkaSqlite.db");
104+
return Task.CompletedTask;
105+
}
106+
107+
public Task DisposeAsync()
108+
{
109+
return Task.CompletedTask;
110+
}
111+
}
112+
113+
internal sealed class Persisted: IEquatable<Persisted>
114+
{
115+
public Persisted(string payload)
116+
{
117+
Payload = payload;
118+
}
119+
120+
public string Payload { get; }
121+
122+
public bool Equals(Persisted other)
123+
{
124+
if (ReferenceEquals(null, other)) return false;
125+
if (ReferenceEquals(this, other)) return true;
126+
return Payload == other.Payload;
127+
}
128+
129+
public override bool Equals(object obj)
130+
{
131+
return ReferenceEquals(this, obj) || obj is Persisted other && Equals(other);
132+
}
133+
134+
public override int GetHashCode()
135+
{
136+
return (Payload != null ? Payload.GetHashCode() : 0);
137+
}
138+
}
139+
140+
internal class MySerializer : Serializer
141+
{
142+
public MySerializer(ExtendedActorSystem system) : base(system)
143+
{
144+
}
145+
146+
public override bool IncludeManifest { get { return true; } }
147+
public override int Identifier { get { return 9999; } }
148+
149+
public override byte[] ToBinary(object obj)
150+
{
151+
return Encoding.UTF8.GetBytes(obj.ToString());
152+
}
153+
154+
public override object FromBinary(byte[] bytes, Type type)
155+
{
156+
return Encoding.UTF8.GetString(bytes);
157+
}
158+
}
159+
160+
internal sealed class PersistedActor : UntypedPersistentActor
161+
{
162+
private readonly IActorRef _probe;
163+
164+
public PersistedActor(string persistenceId, IActorRef probe)
165+
{
166+
PersistenceId = persistenceId;
167+
_probe = probe;
168+
}
169+
170+
public override string PersistenceId { get; }
171+
172+
protected override void OnCommand(object message)
173+
{
174+
var sender = Sender;
175+
Persist(message, _ =>
176+
{
177+
sender.Tell(message);
178+
});
179+
}
180+
181+
protected override void OnRecover(object message)
182+
{
183+
switch (message)
184+
{
185+
case Persisted msg:
186+
_probe.Tell(msg);
187+
break;
188+
case RecoveryCompleted _:
189+
_probe.Tell("recovered");
190+
break;
191+
}
192+
}
193+
}
194+
}

0 commit comments

Comments
 (0)