Skip to content

Commit 4537b1c

Browse files
authored
Add SQL query isolation level (#6654)
* Add SQL query isolation level * Unify default isolation level to unspecified for compatibility * Update API Verify list * Update sqlite config unit tests * Remove `IsolationLevel.Chaos` support * Add documentation link in HOCON file
1 parent f48394e commit 4537b1c

15 files changed

+846
-229
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="DbConnectionExtensions.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Data;
10+
using System.Data.Common;
11+
using System.Runtime.CompilerServices;
12+
using System.Threading;
13+
using System.Threading.Tasks;
14+
15+
namespace Akka.Persistence.Sql.Common.Extensions
16+
{
17+
public static class DbConnectionExtensions
18+
{
19+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
20+
public static async Task ExecuteInTransaction(
21+
this DbConnection connection,
22+
IsolationLevel isolationLevel,
23+
CancellationToken token,
24+
Func<DbTransaction, CancellationToken, Task> task)
25+
{
26+
using var tx = connection.BeginTransaction(isolationLevel);
27+
try
28+
{
29+
await task(tx, token);
30+
tx.Commit();
31+
}
32+
catch (Exception ex1)
33+
{
34+
try
35+
{
36+
tx.Rollback();
37+
}
38+
catch (Exception ex2)
39+
{
40+
throw new AggregateException(ex2, ex1);
41+
}
42+
throw;
43+
}
44+
}
45+
46+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
47+
public static async Task<T> ExecuteInTransaction<T>(
48+
this DbConnection connection,
49+
IsolationLevel isolationLevel,
50+
CancellationToken token,
51+
Func<DbTransaction, CancellationToken, Task<T>> task)
52+
{
53+
using var tx = connection.BeginTransaction(isolationLevel);
54+
try
55+
{
56+
var result = await task(tx, token);
57+
tx.Commit();
58+
return result;
59+
}
60+
catch (Exception ex1)
61+
{
62+
try
63+
{
64+
tx.Rollback();
65+
}
66+
catch (Exception ex2)
67+
{
68+
throw new AggregateException(ex2, ex1);
69+
}
70+
throw;
71+
}
72+
}
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="HoconExtensions.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using System.Data;
9+
using System.Runtime.CompilerServices;
10+
using Akka.Configuration;
11+
12+
namespace Akka.Persistence.Sql.Common.Extensions
13+
{
14+
public static class IsolationLevelExtensions
15+
{
16+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
17+
public static IsolationLevel GetIsolationLevel(this Config config, string key)
18+
=> config.GetString(key).ToIsolationLevel();
19+
20+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
21+
public static IsolationLevel ToIsolationLevel(this string level)
22+
=> level switch
23+
{
24+
null => IsolationLevel.Unspecified,
25+
"chaos" => // IsolationLevel.Chaos,
26+
throw new ConfigurationException($"{nameof(IsolationLevel)}.{IsolationLevel.Chaos} is not supported."),
27+
"read-committed" => IsolationLevel.ReadCommitted,
28+
"read-uncommitted" => IsolationLevel.ReadUncommitted,
29+
"repeatable-read" => IsolationLevel.RepeatableRead,
30+
"serializable" => IsolationLevel.Serializable,
31+
"snapshot" => IsolationLevel.Snapshot,
32+
"unspecified" => IsolationLevel.Unspecified,
33+
_ => throw new ConfigurationException(
34+
"Unknown isolation-level value. Should be one of: read-committed | read-uncommitted | repeatable-read | serializable | snapshot | unspecified")
35+
};
36+
37+
}
38+
}

src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs

+107-34
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using Akka.Event;
2323
using Akka.Pattern;
2424
using Akka.Persistence.Journal;
25+
using Akka.Persistence.Sql.Common.Extensions;
2526
using Akka.Persistence.Sql.Common.Journal;
2627
using Akka.Serialization;
2728
using Akka.Util;
@@ -210,9 +211,10 @@ public abstract class BatchingSqlJournalSetup
210211
public TimeSpan ConnectionTimeout { get; }
211212

212213
/// <summary>
213-
/// Isolation level of transactions used during query execution.
214+
/// Isolation level of transactions used during write query execution.
214215
/// </summary>
215-
public IsolationLevel IsolationLevel { get; }
216+
[Obsolete("Use WriteIsolationLevel property instead")]
217+
public IsolationLevel IsolationLevel => WriteIsolationLevel;
216218

217219
/// <summary>
218220
/// Settings specific to <see cref="CircuitBreaker"/>, which is used internally
@@ -241,6 +243,16 @@ public abstract class BatchingSqlJournalSetup
241243
/// The fully qualified name of the type that should be used as timestamp provider.
242244
/// </summary>
243245
public string TimestampProviderTypeName { get; }
246+
247+
/// <summary>
248+
/// Isolation level of transactions used during read query execution.
249+
/// </summary>
250+
public IsolationLevel ReadIsolationLevel { get; }
251+
252+
/// <summary>
253+
/// Isolation level of transactions used during write query execution.
254+
/// </summary>
255+
public IsolationLevel WriteIsolationLevel { get; }
244256

245257
/// <summary>
246258
/// Initializes a new instance of the <see cref="BatchingSqlJournalSetup" /> class.
@@ -276,30 +288,26 @@ protected BatchingSqlJournalSetup(Config config, QueryConfiguration namingConven
276288
if (string.IsNullOrWhiteSpace(connectionString))
277289
throw new ConfigurationException("No connection string for Sql Event Journal was specified");
278290

279-
IsolationLevel level;
280-
switch (config.GetString("isolation-level", "unspecified"))
281-
{
282-
case "chaos": level = IsolationLevel.Chaos; break;
283-
case "read-committed": level = IsolationLevel.ReadCommitted; break;
284-
case "read-uncommitted": level = IsolationLevel.ReadUncommitted; break;
285-
case "repeatable-read": level = IsolationLevel.RepeatableRead; break;
286-
case "serializable": level = IsolationLevel.Serializable; break;
287-
case "snapshot": level = IsolationLevel.Snapshot; break;
288-
case "unspecified": level = IsolationLevel.Unspecified; break;
289-
default: throw new ConfigurationException("Unknown isolation-level value. Should be one of: chaos | read-committed | read-uncommitted | repeatable-read | serializable | snapshot | unspecified");
290-
}
291+
ReadIsolationLevel = namingConventions.ReadIsolationLevel;
292+
WriteIsolationLevel = namingConventions.WriteIsolationLevel;
293+
294+
// backward compatibility
295+
var level = config.GetString("isolation-level");
296+
if (level is { })
297+
WriteIsolationLevel = level.ToIsolationLevel();
291298

292299
ConnectionString = connectionString;
293300
MaxConcurrentOperations = config.GetInt("max-concurrent-operations", 64);
294301
MaxBatchSize = config.GetInt("max-batch-size", 100);
295302
MaxBufferSize = config.GetInt("max-buffer-size", 500000);
296303
AutoInitialize = config.GetBoolean("auto-initialize", false);
297304
ConnectionTimeout = config.GetTimeSpan("connection-timeout", TimeSpan.FromSeconds(30));
298-
IsolationLevel = level;
299305
CircuitBreakerSettings = new CircuitBreakerSettings(config.GetConfig("circuit-breaker"));
300306
ReplayFilterSettings = new ReplayFilterSettings(config.GetConfig("replay-filter"));
301307
NamingConventions = namingConventions;
308+
#pragma warning disable CS0618
302309
DefaultSerializer = config.GetString("serializer", null);
310+
#pragma warning restore CS0618
303311
TimestampProviderTypeName = config.GetString("timestamp-provider", null);
304312
}
305313

@@ -323,19 +331,84 @@ protected BatchingSqlJournalSetup(Config config, QueryConfiguration namingConven
323331
/// <param name="replayFilterSettings">The settings used when replaying events from database back to the persistent actors.</param>
324332
/// <param name="namingConventions">The naming conventions used by the database to construct valid SQL statements.</param>
325333
/// <param name="defaultSerializer">The serializer used when no specific type matching can be found.</param>
326-
protected BatchingSqlJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize, TimeSpan connectionTimeout, IsolationLevel isolationLevel, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions, string defaultSerializer)
334+
[Obsolete("Use constructor with separate read and write isolation level instead. (since v1.5.2)")]
335+
protected BatchingSqlJournalSetup(
336+
string connectionString,
337+
int maxConcurrentOperations,
338+
int maxBatchSize,
339+
int maxBufferSize,
340+
bool autoInitialize,
341+
TimeSpan connectionTimeout,
342+
IsolationLevel isolationLevel,
343+
CircuitBreakerSettings circuitBreakerSettings,
344+
ReplayFilterSettings replayFilterSettings,
345+
QueryConfiguration namingConventions,
346+
string defaultSerializer)
347+
: this(
348+
connectionString: connectionString,
349+
maxConcurrentOperations: maxConcurrentOperations,
350+
maxBatchSize: maxBatchSize,
351+
maxBufferSize: maxBufferSize,
352+
autoInitialize: autoInitialize,
353+
connectionTimeout: connectionTimeout,
354+
writeIsolationLevel: isolationLevel,
355+
readIsolationLevel: isolationLevel,
356+
circuitBreakerSettings: circuitBreakerSettings,
357+
replayFilterSettings: replayFilterSettings,
358+
namingConventions: namingConventions,
359+
defaultSerializer: defaultSerializer)
360+
{ }
361+
362+
/// <summary>
363+
/// Initializes a new instance of the <see cref="BatchingSqlJournalSetup" /> class.
364+
/// </summary>
365+
/// <param name="connectionString">The connection string used to connect to the database.</param>
366+
/// <param name="maxConcurrentOperations">The maximum number of batch operations allowed to be executed at the same time.</param>
367+
/// <param name="maxBatchSize">The maximum size of single batch of operations to be executed over a single <see cref="DbConnection"/>.</param>
368+
/// <param name="maxBufferSize">The maximum size of requests stored in journal buffer.</param>
369+
/// <param name="autoInitialize">
370+
/// If set to <c>true</c>, the journal executes all SQL scripts stored under the
371+
/// <see cref="BatchingSqlJournal{TConnection,TCommand}.Initializers"/> collection prior
372+
/// to starting executing any requests.
373+
/// </param>
374+
/// <param name="connectionTimeout">The maximum time given for executed <see cref="DbCommand"/> to complete.</param>
375+
/// <param name="readIsolationLevel">The isolation level of transactions used during read query execution.</param>
376+
/// <param name="writeIsolationLevel">The isolation level of transactions used during write query execution.</param>
377+
/// <param name="circuitBreakerSettings">
378+
/// The settings used by the <see cref="CircuitBreaker"/> when for executing request batches.
379+
/// </param>
380+
/// <param name="replayFilterSettings">The settings used when replaying events from database back to the persistent actors.</param>
381+
/// <param name="namingConventions">The naming conventions used by the database to construct valid SQL statements.</param>
382+
/// <param name="defaultSerializer">The serializer used when no specific type matching can be found.</param>
383+
protected BatchingSqlJournalSetup(
384+
string connectionString,
385+
int maxConcurrentOperations,
386+
int maxBatchSize,
387+
int maxBufferSize,
388+
bool autoInitialize,
389+
TimeSpan connectionTimeout,
390+
IsolationLevel readIsolationLevel,
391+
IsolationLevel writeIsolationLevel,
392+
CircuitBreakerSettings circuitBreakerSettings,
393+
ReplayFilterSettings replayFilterSettings,
394+
QueryConfiguration namingConventions,
395+
string defaultSerializer)
327396
{
328397
ConnectionString = connectionString;
329398
MaxConcurrentOperations = maxConcurrentOperations;
330399
MaxBatchSize = maxBatchSize;
331400
MaxBufferSize = maxBufferSize;
332401
AutoInitialize = autoInitialize;
333402
ConnectionTimeout = connectionTimeout;
334-
IsolationLevel = isolationLevel;
403+
WriteIsolationLevel = writeIsolationLevel;
404+
ReadIsolationLevel = readIsolationLevel;
405+
ReadIsolationLevel = writeIsolationLevel;
335406
CircuitBreakerSettings = circuitBreakerSettings;
336407
ReplayFilterSettings = replayFilterSettings;
337408
NamingConventions = namingConventions;
409+
#pragma warning disable CS0618
338410
DefaultSerializer = defaultSerializer;
411+
#pragma warning restore CS0618
339412
}
340413
}
341414

@@ -547,6 +620,8 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
547620

548621
private readonly Akka.Serialization.Serialization _serialization;
549622
private readonly CircuitBreaker _circuitBreaker;
623+
private readonly IsolationLevel _writeIsolationLevel;
624+
private readonly IsolationLevel _readIsolationLevel;
550625
private int _remainingOperations;
551626

552627
/// <summary>
@@ -574,6 +649,9 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
574649
maxFailures: Setup.CircuitBreakerSettings.MaxFailures,
575650
callTimeout: Setup.CircuitBreakerSettings.CallTimeout,
576651
resetTimeout: Setup.CircuitBreakerSettings.ResetTimeout);
652+
653+
_writeIsolationLevel = Setup.WriteIsolationLevel;
654+
_readIsolationLevel = Setup.ReadIsolationLevel;
577655

578656
var conventions = Setup.NamingConventions;
579657

@@ -862,25 +940,24 @@ private void TryProcess()
862940
{
863941
_remainingOperations--;
864942

865-
var chunk = DequeueChunk(_remainingOperations);
943+
var (chunk, isWrite) = DequeueChunk(_remainingOperations);
866944
var context = Context;
867-
_circuitBreaker.WithCircuitBreaker(() => ExecuteChunk(chunk, context))
945+
_circuitBreaker.WithCircuitBreaker(() => ExecuteChunk(chunk, context, isWrite))
868946
.PipeTo(Self, failure: ex => new ChunkExecutionFailure(ex, chunk.Requests, chunk.ChunkId));
869947
}
870948
}
871949

872-
private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext context)
950+
private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext context, bool isWriteOperation)
873951
{
874952
var writeResults = new Queue<WriteMessagesResult>();
875-
var isWriteOperation = false;
876953
var stopwatch = new Stopwatch();
877954
using (var connection = CreateConnection(Setup.ConnectionString))
878955
{
879956
await connection.OpenAsync();
880957

881958
// In the grand scheme of thing, using a transaction in an all read batch operation
882959
// should not hurt performance by much, because it is done only once at the start.
883-
using (var tx = connection.BeginTransaction(Setup.IsolationLevel))
960+
using (var tx = connection.BeginTransaction(isWriteOperation ? _writeIsolationLevel : _readIsolationLevel))
884961
using (var command = (TCommand)connection.CreateCommand())
885962
{
886963
command.CommandTimeout = (int)Setup.ConnectionTimeout.TotalSeconds;
@@ -895,11 +972,9 @@ private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext
895972
switch (req)
896973
{
897974
case WriteMessages msg:
898-
isWriteOperation = true;
899975
writeResults.Enqueue(await HandleWriteMessages(msg, command));
900976
break;
901977
case DeleteMessagesTo msg:
902-
isWriteOperation = true;
903978
await HandleDeleteMessagesTo(msg, command);
904979
break;
905980
case ReplayMessages msg:
@@ -1331,7 +1406,7 @@ protected virtual void PreAddParameterToCommand(TCommand command, DbParameter pa
13311406
/// Select the buffer that has the smallest id on its first item, retrieve a maximum Setup.MaxBatchSize
13321407
/// items from it, and return it as a chunk that needs to be batched
13331408
/// </summary>
1334-
private RequestChunk DequeueChunk(int chunkId)
1409+
private (RequestChunk chunk, bool isWrite) DequeueChunk(int chunkId)
13351410
{
13361411
var currentBuffer = _buffers
13371412
.Where(q => q.Count > 0)
@@ -1350,18 +1425,16 @@ private RequestChunk DequeueChunk(int chunkId)
13501425
if (operations.Count == Setup.MaxBatchSize)
13511426
break;
13521427
}
1428+
return (new RequestChunk(chunkId, operations.ToArray()), true);
13531429
}
1354-
else
1430+
1431+
while(currentBuffer.Count > 0)
13551432
{
1356-
while(currentBuffer.Count > 0)
1357-
{
1358-
operations.Add(currentBuffer.Dequeue().request);
1359-
if (operations.Count == Setup.MaxBatchSize)
1360-
break;
1361-
}
1433+
operations.Add(currentBuffer.Dequeue().request);
1434+
if (operations.Count == Setup.MaxBatchSize)
1435+
break;
13621436
}
1363-
1364-
return new RequestChunk(chunkId, operations.ToArray());
1437+
return (new RequestChunk(chunkId, operations.ToArray()), false);
13651438
}
13661439

13671440
private void CompleteBatch(BatchComplete msg)

0 commit comments

Comments
 (0)