@@ -94,7 +94,7 @@ protected override bool Receive(object message)
94
94
* We write out code, knowing that the other side will stream the data into it.
95
95
* For them it's a Sink; for us it's a Source.
96
96
*/
97
- var sink = StreamRefs . SinkRef < string > ( ) . To ( Sink . ActorRef < string > ( _probe , "<COMPLETE>" ) )
97
+ var sink = StreamRefs . SinkRef < string > ( ) . To ( Sink . ActorRef < string > ( _probe , "<COMPLETE>" , ex => new Status . Failure ( ex ) ) )
98
98
. Run ( _materializer ) ;
99
99
sink . PipeTo ( Sender ) ;
100
100
return true ;
@@ -109,7 +109,7 @@ protected override bool Receive(object message)
109
109
{
110
110
var sink = StreamRefs . SinkRef < string > ( )
111
111
. WithAttributes ( StreamRefAttributes . CreateSubscriptionTimeout ( TimeSpan . FromMilliseconds ( 500 ) ) )
112
- . To ( Sink . ActorRef < string > ( _probe , "<COMPLETE>" ) )
112
+ . To ( Sink . ActorRef < string > ( _probe , "<COMPLETE>" , ex => new Status . Failure ( ex ) ) )
113
113
. Run ( _materializer ) ;
114
114
sink . PipeTo ( Sender ) ;
115
115
return true ;
@@ -242,7 +242,7 @@ public void SourceRef_must_send_messages_via_remoting()
242
242
_remoteActor . Tell ( "give" ) ;
243
243
var sourceRef = ExpectMsg < ISourceRef < string > > ( ) ;
244
244
245
- sourceRef . Source . RunWith ( Sink . ActorRef < string > ( _probe . Ref , "<COMPLETE>" ) , Materializer ) ;
245
+ sourceRef . Source . RunWith ( Sink . ActorRef < string > ( _probe . Ref , "<COMPLETE>" , ex => new Status . Failure ( ex ) ) , Materializer ) ;
246
246
247
247
_probe . ExpectMsg ( "hello" ) ;
248
248
_probe . ExpectMsg ( "world" ) ;
@@ -255,7 +255,7 @@ public void SourceRef_must_fail_when_remote_source_failed()
255
255
_remoteActor . Tell ( "give-fail" ) ;
256
256
var sourceRef = ExpectMsg < ISourceRef < string > > ( ) ;
257
257
258
- sourceRef . Source . RunWith ( Sink . ActorRef < string > ( _probe . Ref , "<COMPLETE>" ) , Materializer ) ;
258
+ sourceRef . Source . RunWith ( Sink . ActorRef < string > ( _probe . Ref , "<COMPLETE>" , ex => new Status . Failure ( ex ) ) , Materializer ) ;
259
259
260
260
var f = _probe . ExpectMsg < Status . Failure > ( ) ;
261
261
f . Cause . Message . Should ( ) . Contain ( "Remote stream (" ) ;
@@ -269,7 +269,7 @@ public void SourceRef_must_complete_properly_when_remote_source_is_empty()
269
269
_remoteActor . Tell ( "give-complete-asap" ) ;
270
270
var sourceRef = ExpectMsg < ISourceRef < string > > ( ) ;
271
271
272
- sourceRef . Source . RunWith ( Sink . ActorRef < string > ( _probe . Ref , "<COMPLETE>" ) , Materializer ) ;
272
+ sourceRef . Source . RunWith ( Sink . ActorRef < string > ( _probe . Ref , "<COMPLETE>" , ex => new Status . Failure ( ex ) ) , Materializer ) ;
273
273
274
274
_probe . ExpectMsg ( "<COMPLETE>" ) ;
275
275
}
0 commit comments