Skip to content

Commit af943a9

Browse files
eabaAaronontheweb
andauthored
[72-74] FileSourceSpec (#6620)
Co-authored-by: Aaron Stannard <[email protected]>
1 parent 8d72fe4 commit af943a9

File tree

1 file changed

+22
-22
lines changed

1 file changed

+22
-22
lines changed

src/core/Akka.Streams.Tests/IO/FileSourceSpec.cs

+22-22
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,9 @@ public FileSourceSpec(ITestOutputHelper helper) : base(Utils.UnboundedMailboxCon
4949
}
5050

5151
[Fact]
52-
public void FileSource_should_read_contents_from_a_file()
52+
public async Task FileSource_should_read_contents_from_a_file()
5353
{
54-
this.AssertAllStagesStopped(() =>
55-
{
54+
await this.AssertAllStagesStoppedAsync(() => {
5655
var chunkSize = 512;
5756
var bufferAttributes = Attributes.CreateInputBuffer(1, 2);
5857

@@ -99,14 +98,14 @@ public void FileSource_should_read_contents_from_a_file()
9998
}
10099
sub.Request(1);
101100
c.ExpectComplete();
101+
return Task.CompletedTask;
102102
}, _materializer);
103103
}
104104

105105
[Fact]
106-
public void Filesource_could_read_partial_contents_from_a_file()
106+
public async Task Filesource_could_read_partial_contents_from_a_file()
107107
{
108-
this.AssertAllStagesStopped(() =>
109-
{
108+
await this.AssertAllStagesStoppedAsync(() => {
110109
var chunkSize = 512;
111110
var startPosition = 1000;
112111
var bufferAttributes = Attributes.CreateInputBuffer(1, 2);
@@ -121,7 +120,8 @@ public void Filesource_could_read_partial_contents_from_a_file()
121120

122121
var remaining = _testText.Substring(1000);
123122

124-
var nextChunk = new Func<string>(() => {
123+
var nextChunk = new Func<string>(() =>
124+
{
125125
string chunks;
126126

127127
if (remaining.Length <= chunkSize)
@@ -141,21 +141,20 @@ public void Filesource_could_read_partial_contents_from_a_file()
141141
sub.Request(5000);
142142

143143
var expectedChunk = nextChunk();
144-
for(int i=0; i<10; ++i)
144+
for (int i = 0; i < 10; ++i)
145145
{
146146
c.ExpectNext().ToString().Should().Be(expectedChunk);
147147
expectedChunk = nextChunk();
148148
}
149149
c.ExpectComplete();
150-
150+
return Task.CompletedTask;
151151
}, _materializer);
152152
}
153153

154154
[Fact]
155-
public void FileSource_should_complete_only_when_all_contents_of_a_file_have_been_signalled()
155+
public async Task FileSource_should_complete_only_when_all_contents_of_a_file_have_been_signalled()
156156
{
157-
this.AssertAllStagesStopped(() =>
158-
{
157+
await this.AssertAllStagesStoppedAsync(() => {
159158
var chunkSize = 512;
160159
var bufferAttributes = Attributes.CreateInputBuffer(1, 2);
161160
var demandAllButOnechunks = _testText.Length / chunkSize - 1;
@@ -199,18 +198,18 @@ public void FileSource_should_complete_only_when_all_contents_of_a_file_have_bee
199198
sub.Request(1);
200199
c.ExpectNext().ToString(Encoding.UTF8).Should().Be(nextChunk());
201200
c.ExpectComplete();
201+
return Task.CompletedTask;
202202
}, _materializer);
203203
}
204204

205205
[Fact]
206-
public void FileSource_should_open_file_in_shared_mode_for_reading_multiple_times()
206+
public async Task FileSource_should_open_file_in_shared_mode_for_reading_multiple_times()
207207
{
208-
this.AssertAllStagesStopped(() =>
209-
{
208+
await this.AssertAllStagesStoppedAsync(() => {
210209
var testFile = TestFile();
211210
var p1 = FileIO.FromFile(testFile).RunWith(Sink.AsPublisher<ByteString>(false), _materializer);
212211
var p2 = FileIO.FromFile(testFile).RunWith(Sink.AsPublisher<ByteString>(false), _materializer);
213-
212+
214213
var c1 = this.CreateManualSubscriberProbe<ByteString>();
215214
var c2 = this.CreateManualSubscriberProbe<ByteString>();
216215
p1.Subscribe(c1);
@@ -223,14 +222,14 @@ public void FileSource_should_open_file_in_shared_mode_for_reading_multiple_time
223222

224223
c1.ExpectNext();
225224
c2.ExpectNext();
226-
225+
return Task.CompletedTask;
227226
}, _materializer);
228227
}
229228

230229
[Fact]
231-
public void FileSource_should_onError_with_failure_and_return_a_failed_IOResult_when_trying_to_read_from_file_which_does_not_exist()
230+
public async Task FileSource_should_onError_with_failure_and_return_a_failed_IOResult_when_trying_to_read_from_file_which_does_not_exist()
232231
{
233-
this.AssertAllStagesStopped(async() =>
232+
await this.AssertAllStagesStoppedAsync(async() =>
234233
{
235234
var t = FileIO.FromFile(NotExistingFile())
236235
.ToMaterialized(Sink.AsPublisher<ByteString>(false), Keep.Both)
@@ -267,10 +266,9 @@ public void FileSource_should_count_lines_in_a_real_file(int chunkSize, int read
267266
}
268267

269268
[Fact]
270-
public void FileSource_should_use_dedicated_blocking_io_dispatcher_by_default()
269+
public async Task FileSource_should_use_dedicated_blocking_io_dispatcher_by_default()
271270
{
272-
this.AssertAllStagesStopped(() =>
273-
{
271+
await this.AssertAllStagesStoppedAsync(() => {
274272
var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig);
275273
var materializer = sys.Materializer();
276274

@@ -293,6 +291,8 @@ public void FileSource_should_use_dedicated_blocking_io_dispatcher_by_default()
293291
{
294292
Shutdown(sys);
295293
}
294+
295+
return Task.CompletedTask;
296296
}, _materializer);
297297
}
298298

0 commit comments

Comments
 (0)