Skip to content

Commit e4bd22d

Browse files
Fix learning competing consumers bug (#7324)
* AsyncFile.Move returns true only if the file was read (i.e. exists and not locked) * Clean up the .pending directory if unable to move and unlock the file Co-authored-by: Daniel Marbach <[email protected]>
1 parent 6411f65 commit e4bd22d

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

src/NServiceBus.Core/Transports/Learning/AsyncFile.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,15 @@ public static async Task<bool> Move(string sourcePath, string targetPath, Cancel
121121

122122
if (!IsFileLocked(targetPath))
123123
{
124-
break;
124+
return true;
125125
}
126126

127127
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
128128

129129
count++;
130130
}
131131

132-
return true;
132+
return false;
133133
}
134134

135135
static bool IsFileLocked(string filePath)

src/NServiceBus.Core/Transports/Learning/DirectoryBasedTransaction.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,19 @@ public DirectoryBasedTransaction(string basePath, string pendingDirName, string
1919

2020
public string FileToProcess { get; private set; }
2121

22-
public Task<bool> BeginTransaction(string incomingFilePath, CancellationToken cancellationToken = default)
22+
public async Task<bool> BeginTransaction(string incomingFilePath, CancellationToken cancellationToken = default)
2323
{
2424
Directory.CreateDirectory(transactionDir);
2525
FileToProcess = Path.Combine(transactionDir, Path.GetFileName(incomingFilePath));
2626

27-
return AsyncFile.Move(incomingFilePath, FileToProcess, cancellationToken);
27+
var succeeded = await AsyncFile.Move(incomingFilePath, FileToProcess, cancellationToken).ConfigureAwait(false);
28+
if (succeeded)
29+
{
30+
return true;
31+
}
32+
33+
Directory.Delete(transactionDir, true);
34+
return false;
2835
}
2936

3037
public async Task Commit(CancellationToken cancellationToken = default)

0 commit comments

Comments
 (0)