Skip to content

Commit 5636752

Browse files
authored
[61-74] QueueSinkSpec (#6609)
* [61-74] `QueueSinkSpec` * Changes to `async/await`
1 parent 5159b71 commit 5636752

File tree

1 file changed

+44
-53
lines changed

1 file changed

+44
-53
lines changed

src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs

+44-53
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@ public QueueSinkSpec(ITestOutputHelper output) : base(output)
3838
}
3939

4040
[Fact]
41-
public void QueueSink_should_send_the_elements_as_result_of_future()
41+
public async Task QueueSink_should_send_the_elements_as_result_of_future()
4242
{
43-
this.AssertAllStagesStopped(() =>
44-
{
43+
await this.AssertAllStagesStoppedAsync(async() => {
4544
var expected = new List<Option<int>>
4645
{
4746
Option<int>.Create(1),
@@ -52,19 +51,18 @@ public void QueueSink_should_send_the_elements_as_result_of_future()
5251
var queue = Source.From(expected.Where(o => o.HasValue).Select(o => o.Value))
5352
.RunWith(Sink.Queue<int>(), _materializer);
5453

55-
expected.ForEach(v =>
54+
foreach(var v in expected)
5655
{
5756
queue.PullAsync().PipeTo(TestActor);
58-
ExpectMsg(v);
59-
});
57+
await ExpectMsgAsync(v);
58+
};
6059
}, _materializer);
6160
}
6261

6362
[Fact]
64-
public void QueueSink_should_allow_to_have_only_one_future_waiting_for_result_in_each_point_in_time()
63+
public async Task QueueSink_should_allow_to_have_only_one_future_waiting_for_result_in_each_point_in_time()
6564
{
66-
this.AssertAllStagesStopped(() =>
67-
{
65+
await this.AssertAllStagesStoppedAsync(async() => {
6866
var probe = this.CreateManualPublisherProbe<int>();
6967
var queue = Source.FromPublisher(probe).RunWith(Sink.Queue<int>(), _materializer);
7068
var sub = probe.ExpectSubscription();
@@ -74,58 +72,55 @@ public void QueueSink_should_allow_to_have_only_one_future_waiting_for_result_in
7472

7573
sub.SendNext(1);
7674
future.PipeTo(TestActor);
77-
ExpectMsg(Option<int>.Create(1));
75+
await ExpectMsgAsync(Option<int>.Create(1));
7876

7977
sub.SendComplete();
80-
queue.PullAsync();
78+
await queue.PullAsync();
8179
}, _materializer);
8280
}
8381

8482
[Fact]
85-
public void QueueSink_should_wait_for_next_element_from_upstream()
83+
public async Task QueueSink_should_wait_for_next_element_from_upstream()
8684
{
87-
this.AssertAllStagesStopped(() =>
88-
{
85+
await this.AssertAllStagesStoppedAsync(async() => {
8986
var probe = this.CreateManualPublisherProbe<int>();
9087
var queue = Source.FromPublisher(probe).RunWith(Sink.Queue<int>(), _materializer);
9188
var sub = probe.ExpectSubscription();
9289

9390
queue.PullAsync().PipeTo(TestActor);
94-
ExpectNoMsg(_pause);
91+
await ExpectNoMsgAsync(_pause);
9592

9693
sub.SendNext(1);
97-
ExpectMsg(Option<int>.Create(1));
94+
await ExpectMsgAsync(Option<int>.Create(1));
9895
sub.SendComplete();
99-
queue.PullAsync();
96+
await queue.PullAsync();
10097
}, _materializer);
10198
}
10299

103100
[Fact]
104-
public void QueueSink_should_fail_future_on_stream_failure()
101+
public async Task QueueSink_should_fail_future_on_stream_failure()
105102
{
106-
this.AssertAllStagesStopped(() =>
107-
{
103+
await this.AssertAllStagesStoppedAsync(async() => {
108104
var probe = this.CreateManualPublisherProbe<int>();
109105
var queue = Source.FromPublisher(probe).RunWith(Sink.Queue<int>(), _materializer);
110-
var sub = probe.ExpectSubscription();
106+
var sub = await probe.ExpectSubscriptionAsync();
111107

112108
queue.PullAsync().PipeTo(TestActor);
113-
ExpectNoMsg(_pause);
109+
await ExpectNoMsgAsync(_pause);
114110

115111
sub.SendError(TestException());
116-
ExpectMsg<Status.Failure>(
112+
await ExpectMsgAsync<Status.Failure>(
117113
f => f.Cause.Equals(TestException()));
118114
}, _materializer);
119115
}
120116

121117
[Fact]
122-
public void QueueSink_should_fail_future_when_stream_failed()
118+
public async Task QueueSink_should_fail_future_when_stream_failed()
123119
{
124-
this.AssertAllStagesStopped(() =>
125-
{
120+
await this.AssertAllStagesStoppedAsync(async() => {
126121
var probe = this.CreateManualPublisherProbe<int>();
127122
var queue = Source.FromPublisher(probe).RunWith(Sink.Queue<int>(), _materializer);
128-
var sub = probe.ExpectSubscription();
123+
var sub = await probe.ExpectSubscriptionAsync();
129124

130125
sub.SendError(TestException());
131126
queue.Invoking(q => q.PullAsync().Wait(RemainingOrDefault))
@@ -134,51 +129,48 @@ public void QueueSink_should_fail_future_when_stream_failed()
134129
}
135130

136131
[Fact]
137-
public void QueueSink_should_timeout_future_when_stream_cannot_provide_data()
132+
public async Task QueueSink_should_timeout_future_when_stream_cannot_provide_data()
138133
{
139-
this.AssertAllStagesStopped(() =>
140-
{
134+
await this.AssertAllStagesStoppedAsync(async() => {
141135
var probe = this.CreateManualPublisherProbe<int>();
142136
var queue = Source.FromPublisher(probe).RunWith(Sink.Queue<int>(), _materializer);
143-
var sub = probe.ExpectSubscription();
137+
var sub = await probe.ExpectSubscriptionAsync();
144138

145139
queue.PullAsync().PipeTo(TestActor);
146-
ExpectNoMsg(_pause);
140+
await ExpectNoMsgAsync(_pause);
147141

148142
sub.SendNext(1);
149-
ExpectMsg(Option<int>.Create(1));
143+
await ExpectMsgAsync(Option<int>.Create(1));
150144
sub.SendComplete();
151-
queue.PullAsync();
145+
await queue.PullAsync();
152146
}, _materializer);
153147
}
154148

155149
[Fact]
156-
public void QueueSink_should_fail_pull_future_when_stream_is_completed()
150+
public async Task QueueSink_should_fail_pull_future_when_stream_is_completed()
157151
{
158-
this.AssertAllStagesStopped(() =>
159-
{
152+
await this.AssertAllStagesStoppedAsync(async() => {
160153
var probe = this.CreateManualPublisherProbe<int>();
161154
var queue = Source.FromPublisher(probe).RunWith(Sink.Queue<int>(), _materializer);
162-
var sub = probe.ExpectSubscription();
155+
var sub = await probe.ExpectSubscriptionAsync();
163156

164157
queue.PullAsync().PipeTo(TestActor);
165158
sub.SendNext(1);
166-
ExpectMsg(Option<int>.Create(1));
159+
await ExpectMsgAsync(Option<int>.Create(1));
167160

168161
sub.SendComplete();
169-
var result = queue.PullAsync().Result;
162+
var result = await queue.PullAsync();
170163
result.Should().Be(Option<int>.None);
171164

172-
var exception = Record.ExceptionAsync(async () => await queue.PullAsync()).Result;
165+
var exception = await Record.ExceptionAsync(async () => await queue.PullAsync());
173166
exception.Should().BeOfType<StreamDetachedException>();
174167
}, _materializer);
175168
}
176169

177170
[Fact]
178-
public void QueueSink_should_keep_on_sending_even_after_the_buffer_has_been_full()
171+
public async Task QueueSink_should_keep_on_sending_even_after_the_buffer_has_been_full()
179172
{
180-
this.AssertAllStagesStopped(() =>
181-
{
173+
await this.AssertAllStagesStoppedAsync(async() => {
182174
const int bufferSize = 16;
183175
const int streamElementCount = bufferSize + 4;
184176
var sink = Sink.Queue<int>().WithAttributes(Attributes.CreateInputBuffer(bufferSize, bufferSize));
@@ -195,31 +187,30 @@ public void QueueSink_should_keep_on_sending_even_after_the_buffer_has_been_full
195187
for (var i = 1; i <= streamElementCount; i++)
196188
{
197189
queue.PullAsync().PipeTo(TestActor);
198-
ExpectMsg(Option<int>.Create(i));
190+
await ExpectMsgAsync(Option<int>.Create(i));
199191
}
200192
queue.PullAsync().PipeTo(TestActor);
201-
ExpectMsg(Option<int>.None);
193+
await ExpectMsgAsync(Option<int>.None);
202194
}, _materializer);
203195
}
204196

205197
[Fact]
206-
public void QueueSink_should_work_with_one_element_buffer()
198+
public async Task QueueSink_should_work_with_one_element_buffer()
207199
{
208-
this.AssertAllStagesStopped(() =>
209-
{
200+
await this.AssertAllStagesStoppedAsync(async() => {
210201
var sink = Sink.Queue<int>().WithAttributes(Attributes.CreateInputBuffer(1, 1));
211202
var probe = this.CreateManualPublisherProbe<int>();
212203
var queue = Source.FromPublisher(probe).RunWith(sink, _materializer);
213-
var sub = probe.ExpectSubscription();
204+
var sub = await probe.ExpectSubscriptionAsync();
214205

215206
queue.PullAsync().PipeTo(TestActor);
216207
sub.SendNext(1); // should pull next element
217-
ExpectMsg(Option<int>.Create(1));
208+
await ExpectMsgAsync(Option<int>.Create(1));
218209

219210
queue.PullAsync().PipeTo(TestActor);
220-
ExpectNoMsg(); // element requested but buffer empty
211+
await ExpectNoMsgAsync(); // element requested but buffer empty
221212
sub.SendNext(2);
222-
ExpectMsg(Option<int>.Create(2));
213+
await ExpectMsgAsync(Option<int>.Create(2));
223214

224215
sub.SendComplete();
225216
var future = queue.PullAsync();

0 commit comments

Comments
 (0)