Skip to content

Commit b888b40

Browse files
authored
A couple of fixes affecting the FileSubscriber (#5035)
* Fail materialized Task of IO stages when stream fails * Fail FileSubscriber's Task if it can't open the file
1 parent 431b69e commit b888b40

File tree

6 files changed

+130
-21
lines changed

6 files changed

+130
-21
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2564,6 +2564,11 @@ namespace Akka.Streams.Extra
25642564
}
25652565
namespace Akka.Streams.IO
25662566
{
2567+
public sealed class AbruptIOTerminationException : System.Exception
2568+
{
2569+
public AbruptIOTerminationException(Akka.Streams.IO.IOResult ioResult, System.Exception cause) { }
2570+
public Akka.Streams.IO.IOResult IoResult { get; }
2571+
}
25672572
public struct IOResult
25682573
{
25692574
public readonly long Count;

src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,37 @@ public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sin
329329
});
330330
}
331331

332+
[Fact]
333+
public void SynchronousFileSink_should_complete_materialized_task_with_an_exception_when_upstream_fails()
334+
{
335+
TargetFile(f =>
336+
{
337+
var completion = Source.From(_testByteStrings)
338+
.Select(bytes =>
339+
{
340+
if (bytes.Contains(Convert.ToByte('b'))) throw new TestException("bees!");
341+
return bytes;
342+
})
343+
.RunWith(FileIO.ToFile(f), _materializer);
344+
345+
var ex = Intercept<AbruptIOTerminationException>(() => completion.Wait(TimeSpan.FromSeconds(3)));
346+
ex.IoResult.Count.ShouldBe(1001);
347+
CheckFileContent(f, string.Join("", _testLines.TakeWhile(s => !s.Contains('b'))));
348+
}, _materializer);
349+
}
350+
351+
[Fact]
352+
public void SynchronousFileSink_should_complete_with_failure_when_file_cannot_be_open()
353+
{
354+
TargetFile(f =>
355+
{
356+
var completion = Source.Single(ByteString.FromString("42"))
357+
.RunWith(FileIO.ToFile(new FileInfo("I-hope-this-file-doesnt-exist.txt"), FileMode.Open), _materializer);
358+
359+
AssertThrows<FileNotFoundException>(completion.Wait);
360+
}, _materializer);
361+
}
362+
332363
[Fact]
333364
public void SynchronousFileSink_should_write_each_element_if_auto_flush_is_set()
334365
{

src/core/Akka.Streams.Tests/IO/OutputStreamSinkSpec.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using Akka.Actor;
1212
using Akka.IO;
1313
using Akka.Streams.Dsl;
14+
using Akka.Streams.IO;
1415
using Akka.Streams.TestKit.Tests;
1516
using Akka.TestKit;
1617
using Xunit;
@@ -152,6 +153,40 @@ protected override void Dispose(bool disposing)
152153
public override long Length { get; }
153154
public override long Position { get; set; }
154155
}
156+
157+
private sealed class OutputStream : Stream
158+
{
159+
public override void Flush()
160+
{
161+
throw new NotImplementedException();
162+
}
163+
164+
public override long Seek(long offset, SeekOrigin origin)
165+
{
166+
throw new NotImplementedException();
167+
}
168+
169+
public override void SetLength(long value)
170+
{
171+
throw new NotImplementedException();
172+
}
173+
174+
public override int Read(byte[] buffer, int offset, int count)
175+
{
176+
throw new NotImplementedException();
177+
}
178+
179+
public override void Write(byte[] buffer, int offset, int count)
180+
{
181+
}
182+
183+
public override bool CanRead { get; }
184+
public override bool CanSeek { get; }
185+
public override bool CanWrite => true;
186+
public override long Length { get; }
187+
public override long Position { get; set; }
188+
}
189+
155190
#endregion
156191

157192
private readonly ActorMaterializer _materializer;
@@ -199,6 +234,18 @@ public void OutputStreamSink_must_close_underlying_stream_when_error_received()
199234
}, _materializer);
200235
}
201236

237+
[Fact]
238+
public void OutputStreamSink_must_complete_materialized_value_with_the_error()
239+
{
240+
this.AssertAllStagesStopped(() =>
241+
{
242+
var completion = Source.Failed<ByteString>(new Exception("Boom!"))
243+
.RunWith(StreamConverters.FromOutputStream(() => new OutputStream()), _materializer);
244+
245+
AssertThrows<AbruptIOTerminationException>(completion.Wait);
246+
}, _materializer);
247+
}
248+
202249
[Fact]
203250
public void OutputStreamSink_must_close_underlying_stream_when_completion_received()
204251
{

src/core/Akka.Streams/IO/IOResult.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,27 @@ public Exception Error
7171
public static IOResult Failed(long count, Exception reason)
7272
=> new IOResult(count, Result.Failure<NotUsed>(reason));
7373
}
74+
75+
/// <summary>
76+
/// This exception signals that a stream has been completed by an onError signal while there was still IO operations in progress.
77+
/// </summary>
78+
public sealed class AbruptIOTerminationException : Exception
79+
{
80+
/// <summary>
81+
/// The number of bytes read/written up until the error
82+
/// </summary>
83+
public IOResult IoResult { get; }
84+
85+
/// <summary>
86+
/// Initializes a new instance of the <see cref="AbruptIOTerminationException"/> class with the result of the IO operation
87+
/// until the error and a reference to the inner exception that is the cause of this exception.
88+
/// </summary>
89+
/// <param name="ioResult">The result of the IO operation until the error</param>
90+
/// <param name="cause">The exception that is the cause of the current exception</param>
91+
public AbruptIOTerminationException(IOResult ioResult, Exception cause)
92+
: base("Stream terminated without completing IO operation.", cause)
93+
{
94+
IoResult = ioResult;
95+
}
96+
}
7497
}

src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Akka.IO;
1414
using Akka.Streams.Actors;
1515
using Akka.Streams.IO;
16+
using Akka.Util;
1617

1718
namespace Akka.Streams.Implementation.IO
1819
{
@@ -110,7 +111,7 @@ protected override void PreStart()
110111
}
111112
catch (Exception ex)
112113
{
113-
CloseAndComplete(IOResult.Failed(_bytesWritten, ex));
114+
CloseAndComplete(new Try<IOResult>(ex));
114115
Cancel();
115116
}
116117
}
@@ -143,7 +144,7 @@ protected override bool Receive(object message)
143144

144145
case OnError error:
145146
_log.Error(error.Cause, "Tearing down FileSink({0}) due to upstream error", _f.FullName);
146-
CloseAndComplete(IOResult.Failed(_bytesWritten, error.Cause));
147+
CloseAndComplete(new Try<IOResult>(new AbruptIOTerminationException(IOResult.Success(_bytesWritten), error.Cause)));
147148
Context.Stop(Self);
148149
return true;
149150

@@ -185,19 +186,23 @@ protected override void PostStop()
185186
base.PostStop();
186187
}
187188

188-
private void CloseAndComplete(IOResult result)
189+
private void CloseAndComplete(Try<IOResult> result)
189190
{
190191
try
191192
{
192193
// close the channel/file before completing the promise, allowing the
193194
// file to be deleted, which would not work (on some systems) if the
194195
// file is still open for writing
195196
_chan?.Dispose();
196-
_completionPromise.TrySetResult(result);
197+
198+
if (result.IsSuccess)
199+
_completionPromise.SetResult(result.Success.Value);
200+
else
201+
_completionPromise.SetException(result.Failure.Value);
197202
}
198203
catch (Exception ex)
199204
{
200-
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
205+
_completionPromise.TrySetException(ex);
201206
}
202207
}
203208
}

src/core/Akka.Streams/Implementation/IO/OutputStreamSubscriber.cs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
using Akka.IO;
1414
using Akka.Streams.Actors;
1515
using Akka.Streams.IO;
16-
using Akka.Util;
1716

1817
namespace Akka.Streams.Implementation.IO
1918
{
@@ -75,12 +74,12 @@ public OutputStreamSubscriber(Stream outputStream, TaskCompletionSource<IOResult
7574
/// <returns>TBD</returns>
7675
protected override bool Receive(object message)
7776
{
78-
return message.Match()
79-
.With<OnNext>(next =>
80-
{
77+
switch (message)
78+
{
79+
case OnNext next:
8180
try
8281
{
83-
var bytes = next.Element as ByteString;
82+
var bytes = (ByteString)next.Element;
8483
//blocking write
8584
_outputStream.Write(bytes.ToArray(), 0, bytes.Count);
8685
_bytesWritten += bytes.Count;
@@ -92,20 +91,19 @@ protected override bool Receive(object message)
9291
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
9392
Cancel();
9493
}
95-
})
96-
.With<OnError>(error =>
97-
{
98-
_log.Error(error.Cause,
99-
$"Tearing down OutputStreamSink due to upstream error, wrote bytes: {_bytesWritten}");
100-
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, error.Cause));
94+
return true;
95+
case OnError error:
96+
_log.Error(error.Cause, "Tearing down OutputStreamSink due to upstream error, wrote bytes: {0}", _bytesWritten);
97+
_completionPromise.TrySetException(new AbruptIOTerminationException(IOResult.Success(_bytesWritten), error.Cause));
10198
Context.Stop(Self);
102-
})
103-
.With<OnComplete>(() =>
104-
{
99+
return true;
100+
case OnComplete _:
105101
Context.Stop(Self);
106102
_outputStream.Flush();
107-
})
108-
.WasHandled;
103+
return true;
104+
}
105+
106+
return false;
109107
}
110108

111109
/// <summary>

0 commit comments

Comments
 (0)