Skip to content

Force write journal to initialize when read journal is started #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/Akka.Persistence.Sql.Tests.Common/Query/Bugfix344Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// -----------------------------------------------------------------------
// <copyright file="Bugfix344Spec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Persistence.Query;
using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Query;
using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Streams;
using Akka.Streams.TestKit;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace Akka.Persistence.Sql.Tests.Common.Query
{
public abstract class Bugfix344Spec<T> : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime where T : ITestContainer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abstract spec to validate that this issue has been resolved, and that the ReadJournal will automatically start up the write journal and wait for it to initialize.

{
protected Bugfix344Spec(ITestOutputHelper output, T fixture) : base(config:Config(TagMode.TagTable, fixture), output:output)
{
ReadJournal = Sys.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
Materializer = Sys.Materializer();
}

protected ActorMaterializer Materializer { get; }
protected IReadJournal? ReadJournal { get; set; }

public Task InitializeAsync()
=> Task.CompletedTask;

public Task DisposeAsync()
=> Task.CompletedTask;

[Fact]
public async Task ReadJournal_should_initialize_tables_when_started_before_WriteJournal()
{
if (ReadJournal is not ICurrentEventsByTagQuery queries)
throw IsTypeException.ForMismatchedType(nameof(IEventsByTagQuery), ReadJournal?.GetType().Name ?? "null");

// should just not return
await EventFilter.Error().ExpectAsync(
0,
async () =>
{
var blackSrc = queries.CurrentEventsByTag("random-unused-tag", offset: NoOffset.Instance);
var probe = blackSrc.RunWith(this.SinkProbe<EventEnvelope>(), Materializer);
probe.Request(2);
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));

// query should just gracefully exit
await probe.ExpectCompleteAsync();
});
}

private static Configuration.Config Config(TagMode tagMode, T fixture)
{
if (!fixture.InitializeDbAsync().Wait(10.Seconds()))
throw new Exception("Failed to clean up database in 10 seconds");

return ConfigurationFactory.ParseString(
$$"""

akka.loglevel = INFO
akka.persistence.journal.plugin = "akka.persistence.journal.sql"
akka.persistence.journal.sql {
event-adapters {
color-tagger = "Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"
}
event-adapter-bindings = {
"System.String" = color-tagger
}
provider-name = "{{fixture.ProviderName}}"
tag-write-mode = "{{tagMode}}"
connection-string = "{{fixture.ConnectionString}}"
}
akka.persistence.query.journal.sql {
provider-name = "{{fixture.ProviderName}}"
connection-string = "{{fixture.ConnectionString}}"
tag-read-mode = "{{tagMode}}"
refresh-interval = 1s
}
akka.test.single-expect-default = 10s
""")
.WithFallback(SqlPersistence.DefaultConfiguration);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// -----------------------------------------------------------------------
// <copyright file="PostgreSqlBugfix344Spec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Persistence.Sql.Tests.Common.Query;
using Xunit;
using Xunit.Abstractions;
#if !DEBUG
using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
#endif

namespace Akka.Persistence.Sql.Tests.Query.PostgreSql
{
/// <summary>
/// Need our own collection, to ensure that the database tables haven't been initialized yet
/// </summary>
[CollectionDefinition(nameof(PostgreSqlBugfix344Fixture), DisableParallelization = true)]
public sealed class PostgreSqlBugfix344Fixture : ICollectionFixture<PostgreSqlContainer> { }

#if !DEBUG
[SkipWindows]
#endif
[Collection(nameof(PostgreSqlBugfix344Fixture))]
public class PostgreSqlBugfix344Spec : Bugfix344Spec<PostgreSqlContainer>
{
public PostgreSqlBugfix344Spec(ITestOutputHelper output, PostgreSqlContainer fixture) : base(output, fixture)
{

}
}
}
5 changes: 5 additions & 0 deletions src/Akka.Persistence.Sql/Query/SqlReadJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public SqlReadJournal(
_system = system;

var connFact = new AkkaPersistenceDataConnectionFactory(_readJournalConfig);

// Fix for https://github.com/akkadotnet/Akka.Persistence.Sql/issues/344
var writeJournal = Persistence.Instance.Apply(system).JournalFor(writePluginId);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start the SqlWriteJournal if it has not been started already - technically the _eventAdapters = Persistence.Instance.Apply(system).AdaptersFor(writePluginId); also does this implicitly.

// we want to block, we want to crash if the journal is not available
var started = writeJournal.Ask<Initialized>(IsInitialized.Instance, TimeSpan.FromSeconds(5)).Result;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Block and wait until startup is complete - this should cover any cases of this issue where queries still get executed before the write journal fully finishes initializing.


_mat = Materializer.CreateSystemMaterializer(
context: system,
Expand Down
Loading