-
Notifications
You must be signed in to change notification settings - Fork 13
Force write journal to initialize when read journal is started #423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a4bb442
84f496d
f183d80
b01379f
cc10162
942a655
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// ----------------------------------------------------------------------- | ||
// <copyright file="Bugfix344Spec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
// ----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading.Tasks; | ||
using Akka.Configuration; | ||
using Akka.Persistence.Query; | ||
using Akka.Persistence.Sql.Config; | ||
using Akka.Persistence.Sql.Query; | ||
using Akka.Persistence.Sql.Tests.Common.Containers; | ||
using Akka.Streams; | ||
using Akka.Streams.TestKit; | ||
using FluentAssertions.Extensions; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
using Xunit.Sdk; | ||
|
||
namespace Akka.Persistence.Sql.Tests.Common.Query | ||
{ | ||
public abstract class Bugfix344Spec<T> : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime where T : ITestContainer | ||
{ | ||
protected Bugfix344Spec(ITestOutputHelper output, T fixture) : base(config:Config(TagMode.TagTable, fixture), output:output) | ||
{ | ||
ReadJournal = Sys.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier); | ||
Materializer = Sys.Materializer(); | ||
} | ||
|
||
protected ActorMaterializer Materializer { get; } | ||
protected IReadJournal? ReadJournal { get; set; } | ||
|
||
public Task InitializeAsync() | ||
=> Task.CompletedTask; | ||
|
||
public Task DisposeAsync() | ||
=> Task.CompletedTask; | ||
|
||
[Fact] | ||
public async Task ReadJournal_should_initialize_tables_when_started_before_WriteJournal() | ||
{ | ||
if (ReadJournal is not ICurrentEventsByTagQuery queries) | ||
throw IsTypeException.ForMismatchedType(nameof(IEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); | ||
|
||
// should just not return | ||
await EventFilter.Error().ExpectAsync( | ||
0, | ||
async () => | ||
{ | ||
var blackSrc = queries.CurrentEventsByTag("random-unused-tag", offset: NoOffset.Instance); | ||
var probe = blackSrc.RunWith(this.SinkProbe<EventEnvelope>(), Materializer); | ||
probe.Request(2); | ||
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); | ||
|
||
// query should just gracefully exit | ||
await probe.ExpectCompleteAsync(); | ||
}); | ||
} | ||
|
||
private static Configuration.Config Config(TagMode tagMode, T fixture) | ||
{ | ||
if (!fixture.InitializeDbAsync().Wait(10.Seconds())) | ||
throw new Exception("Failed to clean up database in 10 seconds"); | ||
|
||
return ConfigurationFactory.ParseString( | ||
$$""" | ||
|
||
akka.loglevel = INFO | ||
akka.persistence.journal.plugin = "akka.persistence.journal.sql" | ||
akka.persistence.journal.sql { | ||
event-adapters { | ||
color-tagger = "Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK" | ||
} | ||
event-adapter-bindings = { | ||
"System.String" = color-tagger | ||
} | ||
provider-name = "{{fixture.ProviderName}}" | ||
tag-write-mode = "{{tagMode}}" | ||
connection-string = "{{fixture.ConnectionString}}" | ||
} | ||
akka.persistence.query.journal.sql { | ||
provider-name = "{{fixture.ProviderName}}" | ||
connection-string = "{{fixture.ConnectionString}}" | ||
tag-read-mode = "{{tagMode}}" | ||
refresh-interval = 1s | ||
} | ||
akka.test.single-expect-default = 10s | ||
""") | ||
.WithFallback(SqlPersistence.DefaultConfiguration); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// ----------------------------------------------------------------------- | ||
// <copyright file="PostgreSqlBugfix344Spec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
// ----------------------------------------------------------------------- | ||
|
||
using Akka.Persistence.Sql.Tests.Common.Containers; | ||
using Akka.Persistence.Sql.Tests.Common.Query; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
#if !DEBUG | ||
using Akka.Persistence.Sql.Tests.Common.Internal.Xunit; | ||
#endif | ||
|
||
namespace Akka.Persistence.Sql.Tests.Query.PostgreSql | ||
{ | ||
/// <summary> | ||
/// Need our own collection, to ensure that the database tables haven't been initialized yet | ||
/// </summary> | ||
[CollectionDefinition(nameof(PostgreSqlBugfix344Fixture), DisableParallelization = true)] | ||
public sealed class PostgreSqlBugfix344Fixture : ICollectionFixture<PostgreSqlContainer> { } | ||
|
||
#if !DEBUG | ||
[SkipWindows] | ||
#endif | ||
[Collection(nameof(PostgreSqlBugfix344Fixture))] | ||
public class PostgreSqlBugfix344Spec : Bugfix344Spec<PostgreSqlContainer> | ||
{ | ||
public PostgreSqlBugfix344Spec(ITestOutputHelper output, PostgreSqlContainer fixture) : base(output, fixture) | ||
{ | ||
|
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,11 @@ public SqlReadJournal( | |
_system = system; | ||
|
||
var connFact = new AkkaPersistenceDataConnectionFactory(_readJournalConfig); | ||
|
||
// Fix for https://github.com/akkadotnet/Akka.Persistence.Sql/issues/344 | ||
var writeJournal = Persistence.Instance.Apply(system).JournalFor(writePluginId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Start the |
||
// we want to block, we want to crash if the journal is not available | ||
var started = writeJournal.Ask<Initialized>(IsInitialized.Instance, TimeSpan.FromSeconds(5)).Result; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Block and wait until startup is complete - this should cover any cases of this issue where queries still get executed before the write journal fully finishes initializing. |
||
|
||
_mat = Materializer.CreateSystemMaterializer( | ||
context: system, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Abstract spec to validate that this issue has been resolved, and that the
ReadJournal
will automatically start up the write journal and wait for it to initialize.