Skip to content

Commit 3a75d0c

Browse files
eabaAaronontheweb
andauthored
[73-74] InputStreamSinkSpec (#6621)
Co-authored-by: Aaron Stannard <[email protected]>
1 parent 5da08f7 commit 3a75d0c

File tree

1 file changed

+55
-59
lines changed

1 file changed

+55
-59
lines changed

src/core/Akka.Streams.Tests/IO/InputStreamSinkSpec.cs

+55-59
Original file line numberDiff line numberDiff line change
@@ -37,30 +37,28 @@ public InputStreamSinkSpec(ITestOutputHelper helper) : base(Utils.UnboundedMailb
3737
}
3838

3939
[Fact]
40-
public void InputStreamSink_should_read_bytes_from_input_stream()
40+
public async Task InputStreamSink_should_read_bytes_from_input_stream()
4141
{
42-
this.AssertAllStagesStopped(() =>
43-
{
42+
await this.AssertAllStagesStoppedAsync(() => {
4443
var inputStream = Source.Single(_byteString).RunWith(StreamConverters.AsInputStream(), _materializer);
4544
var result = ReadN(inputStream, _byteString.Count);
4645
inputStream.Dispose();
4746
result.Item1.Should().Be(_byteString.Count);
4847
result.Item2.Should().BeEquivalentTo(_byteString);
49-
48+
return Task.CompletedTask;
5049
}, _materializer);
5150
}
5251

5352
[Fact]
54-
public void InputStreamSink_should_read_bytes_correctly_if_requested_by_input_stream_not_in_chunk_size()
53+
public async Task InputStreamSink_should_read_bytes_correctly_if_requested_by_input_stream_not_in_chunk_size()
5554
{
56-
this.AssertAllStagesStopped(() =>
57-
{
55+
await this.AssertAllStagesStoppedAsync(() => {
5856
var sinkProbe = CreateTestProbe();
5957
var byteString2 = RandomByteString(3);
6058
var inputStream = Source.From(new[] { _byteString, byteString2, null })
6159
.RunWith(TestSink(sinkProbe), _materializer);
6260

63-
sinkProbe.ExpectMsgAllOf(new []{ GraphStageMessages.Push.Instance, GraphStageMessages.Push.Instance });
61+
sinkProbe.ExpectMsgAllOf(new[] { GraphStageMessages.Push.Instance, GraphStageMessages.Push.Instance });
6462

6563
var result = ReadN(inputStream, 2);
6664
result.Item1.Should().Be(2);
@@ -75,34 +73,32 @@ public void InputStreamSink_should_read_bytes_correctly_if_requested_by_input_st
7573
result.Item2.Should().BeEquivalentTo(byteString2.Slice(1));
7674

7775
inputStream.Dispose();
78-
76+
return Task.CompletedTask;
7977
}, _materializer);
8078
}
8179

8280
[Fact]
83-
public void InputStreamSink_should_return_less_than_was_expected_when_data_source_has_provided_some_but_not_enough_data()
81+
public async Task InputStreamSink_should_return_less_than_was_expected_when_data_source_has_provided_some_but_not_enough_data()
8482
{
85-
this.AssertAllStagesStopped(() =>
86-
{
83+
await this.AssertAllStagesStoppedAsync(() => {
8784
var inputStream = Source.Single(_byteString).RunWith(StreamConverters.AsInputStream(), _materializer);
8885

8986
var arr = new byte[_byteString.Count + 1];
9087
inputStream.Read(arr, 0, arr.Length).Should().Be(arr.Length - 1);
9188
inputStream.Dispose();
9289
ByteString.FromBytes(arr).Should().BeEquivalentTo(Enumerable.Concat(_byteString, ByteString.FromBytes(new byte[] { 0 })));
93-
90+
return Task.CompletedTask;
9491
}, _materializer);
9592
}
9693

9794
[WindowsFact(Skip ="Racy in Linux")]
98-
public void InputStreamSink_should_block_read_until_get_requested_number_of_bytes_from_upstream()
95+
public async Task InputStreamSink_should_block_read_until_get_requested_number_of_bytes_from_upstream()
9996
{
100-
this.AssertAllStagesStopped(() =>
101-
{
102-
var run =
103-
this.SourceProbe<ByteString>()
104-
.ToMaterialized(StreamConverters.AsInputStream(), Keep.Both)
105-
.Run(_materializer);
97+
await this.AssertAllStagesStoppedAsync(() => {
98+
var run =
99+
this.SourceProbe<ByteString>()
100+
.ToMaterialized(StreamConverters.AsInputStream(), Keep.Both)
101+
.Run(_materializer);
106102
var probe = run.Item1;
107103
var inputStream = run.Item2;
108104
var f = Task.Run(() => inputStream.Read(new byte[_byteString.Count], 0, _byteString.Count));
@@ -116,18 +112,17 @@ public void InputStreamSink_should_block_read_until_get_requested_number_of_byte
116112
probe.SendComplete();
117113
inputStream.ReadByte().Should().Be(-1);
118114
inputStream.Dispose();
119-
115+
return Task.CompletedTask;
120116
}, _materializer);
121117
}
122118

123119
[Fact]
124-
public void InputStreamSink_should_throw_error_when_reactive_stream_is_closed()
120+
public async Task InputStreamSink_should_throw_error_when_reactive_stream_is_closed()
125121
{
126-
this.AssertAllStagesStopped(() =>
127-
{
128-
var t = this.SourceProbe<ByteString>()
129-
.ToMaterialized(StreamConverters.AsInputStream(), Keep.Both)
130-
.Run(_materializer);
122+
await this.AssertAllStagesStoppedAsync(() => {
123+
var t = this.SourceProbe<ByteString>()
124+
.ToMaterialized(StreamConverters.AsInputStream(), Keep.Both)
125+
.Run(_materializer);
131126
var probe = t.Item1;
132127
var inputStream = t.Item2;
133128

@@ -137,14 +132,14 @@ public void InputStreamSink_should_throw_error_when_reactive_stream_is_closed()
137132

138133
Action block = () => inputStream.Read(new byte[1], 0, 1);
139134
block.Should().Throw<IOException>();
135+
return Task.CompletedTask;
140136
}, _materializer);
141137
}
142138

143139
[Fact]
144-
public void InputStreamSink_should_return_all_data_when_upstream_is_completed()
140+
public async Task InputStreamSink_should_return_all_data_when_upstream_is_completed()
145141
{
146-
this.AssertAllStagesStopped(() =>
147-
{
142+
await this.AssertAllStagesStoppedAsync(() => {
148143
var sinkProbe = CreateTestProbe();
149144
var t = this.SourceProbe<ByteString>().ToMaterialized(TestSink(sinkProbe), Keep.Both).Run(_materializer);
150145
var probe = t.Item1;
@@ -160,14 +155,14 @@ public void InputStreamSink_should_return_all_data_when_upstream_is_completed()
160155
var result = ReadN(inputStream, 3);
161156
result.Item1.Should().Be(1);
162157
result.Item2.Should().BeEquivalentTo(bytes);
158+
return Task.CompletedTask;
163159
}, _materializer);
164160
}
165161

166162
[Fact]
167-
public void InputStreamSink_should_work_when_read_chunks_smaller_then_stream_chunks()
163+
public async Task InputStreamSink_should_work_when_read_chunks_smaller_then_stream_chunks()
168164
{
169-
this.AssertAllStagesStopped(() =>
170-
{
165+
await this.AssertAllStagesStoppedAsync(() => {
171166
var bytes = RandomByteString(10);
172167
var inputStream = Source.Single(bytes).RunWith(StreamConverters.AsInputStream(), _materializer);
173168

@@ -182,31 +177,31 @@ public void InputStreamSink_should_work_when_read_chunks_smaller_then_stream_chu
182177
}
183178

184179
inputStream.Dispose();
180+
return Task.CompletedTask;
185181
}, _materializer);
186182
}
187183

188184
[Fact]
189-
public void InputStreamSink_should_throw_exception_when_call_read_With_wrong_parameters()
185+
public async Task InputStreamSink_should_throw_exception_when_call_read_With_wrong_parameters()
190186
{
191-
this.AssertAllStagesStopped(() =>
192-
{
187+
await this.AssertAllStagesStoppedAsync(() => {
193188
var inputStream = Source.Single(_byteString).RunWith(StreamConverters.AsInputStream(), _materializer);
194189
var buf = new byte[3];
195190

196191
Action(() => inputStream.Read(buf, -1, 2)).Should().Throw<ArgumentException>();
197192
Action(() => inputStream.Read(buf, 0, 5)).Should().Throw<ArgumentException>();
198193
Action(() => inputStream.Read(new byte[0], 0, 1)).Should().Throw<ArgumentException>();
199194
Action(() => inputStream.Read(buf, 0, 0)).Should().Throw<ArgumentException>();
195+
return Task.CompletedTask;
200196
}, _materializer);
201197
}
202198

203199
private Action Action(Action a) => a;
204200

205201
[Fact]
206-
public void InputStreamSink_should_successfully_read_several_chunks_at_once()
202+
public async Task InputStreamSink_should_successfully_read_several_chunks_at_once()
207203
{
208-
this.AssertAllStagesStopped(() =>
209-
{
204+
await this.AssertAllStagesStoppedAsync(() => {
210205
var bytes = Enumerable.Range(1, 4).Select(_ => RandomByteString(4)).ToList();
211206
var sinkProbe = CreateTestProbe();
212207
var inputStream = Source.From(bytes).RunWith(TestSink(sinkProbe), _materializer);
@@ -222,21 +217,21 @@ public void InputStreamSink_should_successfully_read_several_chunks_at_once()
222217
}
223218

224219
inputStream.Dispose();
220+
return Task.CompletedTask;
225221
}, _materializer);
226222
}
227223

228224
[Fact]
229-
public void InputStreamSink_should_work_when_read_chunks_bigger_than_stream_chunks()
225+
public async Task InputStreamSink_should_work_when_read_chunks_bigger_than_stream_chunks()
230226
{
231-
this.AssertAllStagesStopped(() =>
232-
{
227+
await this.AssertAllStagesStoppedAsync(() => {
233228
var bytes1 = RandomByteString(10);
234229
var bytes2 = RandomByteString(10);
235230
var sinkProbe = CreateTestProbe();
236231
var inputStream = Source.From(new[] { bytes1, bytes2, null }).RunWith(TestSink(sinkProbe), _materializer);
237232

238233
//need to wait while both elements arrive to sink
239-
sinkProbe.ExpectMsgAllOf(new []{ GraphStageMessages.Push.Instance, GraphStageMessages.Push.Instance });
234+
sinkProbe.ExpectMsgAllOf(new[] { GraphStageMessages.Push.Instance, GraphStageMessages.Push.Instance });
240235

241236
var r1 = ReadN(inputStream, 15);
242237
r1.Item1.Should().Be(15);
@@ -247,14 +242,14 @@ public void InputStreamSink_should_work_when_read_chunks_bigger_than_stream_chun
247242
r2.Item2.Should().BeEquivalentTo(bytes2.Slice(5));
248243

249244
inputStream.Dispose();
245+
return Task.CompletedTask;
250246
}, _materializer);
251247
}
252248

253249
[Fact]
254-
public void InputStreamSink_should_return_minus_1_when_read_after_stream_is_completed()
250+
public async Task InputStreamSink_should_return_minus_1_when_read_after_stream_is_completed()
255251
{
256-
this.AssertAllStagesStopped(() =>
257-
{
252+
await this.AssertAllStagesStoppedAsync(() => {
258253
var inputStream = Source.Single(_byteString).RunWith(StreamConverters.AsInputStream(), _materializer);
259254

260255
var r = ReadN(inputStream, _byteString.Count);
@@ -263,14 +258,14 @@ public void InputStreamSink_should_return_minus_1_when_read_after_stream_is_comp
263258

264259
inputStream.ReadByte().Should().Be(-1);
265260
inputStream.Dispose();
261+
return Task.CompletedTask;
266262
}, _materializer);
267263
}
268264

269265
[Fact]
270-
public void InputStreamSink_should_return_Exception_when_stream_is_failed()
266+
public async Task InputStreamSink_should_return_Exception_when_stream_is_failed()
271267
{
272-
this.AssertAllStagesStopped(() =>
273-
{
268+
await this.AssertAllStagesStoppedAsync(() => {
274269
var sinkProbe = CreateTestProbe();
275270
var t = this.SourceProbe<ByteString>().ToMaterialized(TestSink(sinkProbe), Keep.Both).Run(_materializer);
276271
var probe = t.Item1;
@@ -293,15 +288,14 @@ public void InputStreamSink_should_return_Exception_when_stream_is_failed()
293288
block.Should().Throw<Exception>();
294289

295290
task.Exception.InnerException.Should().Be(ex);
296-
291+
return Task.CompletedTask;
297292
}, _materializer);
298293
}
299294

300295
[Fact]
301-
public void InputStreamSink_should_use_dedicated_default_blocking_io_dispatcher_by_default()
296+
public async Task InputStreamSink_should_use_dedicated_default_blocking_io_dispatcher_by_default()
302297
{
303-
this.AssertAllStagesStopped(() =>
304-
{
298+
await this.AssertAllStagesStoppedAsync(() => {
305299
var sys = ActorSystem.Create("InputStreamSink-testing", Utils.UnboundedMailboxConfig);
306300
var materializer = ActorMaterializer.Create(sys);
307301
try
@@ -316,30 +310,31 @@ public void InputStreamSink_should_use_dedicated_default_blocking_io_dispatcher_
316310
{
317311
Shutdown(sys);
318312
}
313+
314+
return Task.CompletedTask;
319315
}, _materializer);
320316
}
321317

322318
[Fact]
323-
public void InputStreamSink_should_work_when_more_bytes_pulled_from_input_stream_than_available()
319+
public async Task InputStreamSink_should_work_when_more_bytes_pulled_from_input_stream_than_available()
324320
{
325-
this.AssertAllStagesStopped(() =>
326-
{
321+
await this.AssertAllStagesStoppedAsync(() => {
327322
var inputStream = Source.Single(_byteString).RunWith(StreamConverters.AsInputStream(), _materializer);
328323

329324
var r = ReadN(inputStream, _byteString.Count * 2);
330325
r.Item1.Should().Be(_byteString.Count);
331326
r.Item2.Should().BeEquivalentTo(_byteString);
332327

333328
inputStream.Dispose();
329+
return Task.CompletedTask;
334330
}, _materializer);
335331
}
336332

337333

338334
[Fact]
339-
public void InputStreamSink_should_read_next_byte_as_an_int_from_InputStream()
335+
public async Task InputStreamSink_should_read_next_byte_as_an_int_from_InputStream()
340336
{
341-
this.AssertAllStagesStopped(() =>
342-
{
337+
await this.AssertAllStagesStoppedAsync(() => {
343338
var bytes = ByteString.CopyFrom(new byte[] { 0, 100, 200, 255 });
344339
var inputStream = Source.Single(bytes).RunWith(StreamConverters.AsInputStream(), _materializer);
345340

@@ -348,6 +343,7 @@ public void InputStreamSink_should_read_next_byte_as_an_int_from_InputStream()
348343
.Should().BeEquivalentTo(new[] { 0, 100, 200, 255, -1 });
349344

350345
inputStream.Dispose();
346+
return Task.CompletedTask;
351347
}, _materializer);
352348
}
353349

0 commit comments

Comments
 (0)