@@ -385,13 +385,29 @@ public async Task<IEnumerable<Message>> SendMessageWithReplyAsync(
385
385
_logger . Error ( e , logMsg , peer . Address , reqId , e . ExpectedVersion , e . ActualVersion ) ;
386
386
throw ;
387
387
}
388
+ catch ( ArgumentException e )
389
+ {
390
+ // This exception is thrown on .NET framework when the given peer is invalid.
391
+ _logger . Error (
392
+ e ,
393
+ "ArgumentException occurred during {FName} to {RequestId}. {E}" ,
394
+ nameof ( SendMessageWithReplyAsync ) ,
395
+ reqId ,
396
+ e ) ;
397
+
398
+ // To match with previous implementation, throws TimeoutException when it failed to
399
+ // find peer to send message.
400
+ throw new TimeoutException ( $ "Cannot find peer { peer } .", e ) ;
401
+ }
388
402
catch ( SocketException e )
389
403
{
404
+ // This exception is thrown on .NET core when the given peer is invalid.
390
405
_logger . Error (
391
406
e ,
392
- "SocketException occurred during {FName} to {RequestId}." ,
407
+ "SocketException occurred during {FName} to {RequestId}. {E} " ,
393
408
nameof ( SendMessageWithReplyAsync ) ,
394
- reqId ) ;
409
+ reqId ,
410
+ e ) ;
395
411
396
412
// To match with previous implementation, throws TimeoutException when it failed to
397
413
// find peer to send message.
@@ -453,7 +469,25 @@ internal async Task WriteMessageAsync(
453
469
BitConverter . GetBytes ( length ) . CopyTo ( buffer , 0 ) ;
454
470
serialized . CopyTo ( buffer , 4 ) ;
455
471
NetworkStream stream = client . GetStream ( ) ;
456
- await stream . WriteAsync ( buffer , 0 , buffer . Length , cancellationToken ) ;
472
+
473
+ // NOTE: Stream is forced to be closed because NetStream.WriteAsync()'s
474
+ // cancellation token never works.
475
+ using ( cancellationToken . Register ( ( ) => stream . Close ( ) ) )
476
+ {
477
+ try
478
+ {
479
+ await stream . WriteAsync ( buffer , 0 , buffer . Length , default ) ;
480
+ }
481
+ catch ( Exception )
482
+ {
483
+ if ( cancellationToken . IsCancellationRequested )
484
+ {
485
+ throw new TaskCanceledException ( ) ;
486
+ }
487
+
488
+ throw ;
489
+ }
490
+ }
457
491
}
458
492
459
493
internal async Task < Message [ ] > ReadMessageAsync (
@@ -463,13 +497,24 @@ internal async Task<Message[]> ReadMessageAsync(
463
497
byte [ ] buffer = new byte [ 1000000 ] ;
464
498
NetworkStream stream = client . GetStream ( ) ;
465
499
int bytesRead = 0 ;
466
- try
467
- {
468
- bytesRead = await stream . ReadAsync ( buffer , 0 , buffer . Length , cancellationToken ) ;
469
- }
470
- catch ( OperationCanceledException )
500
+
501
+ // NOTE: Stream is forced to be closed because NetStream.ReadAsync()'s
502
+ // cancellation token never works.
503
+ using ( cancellationToken . Register ( ( ) => stream . Close ( ) ) )
471
504
{
472
- throw new TaskCanceledException ( ) ;
505
+ try
506
+ {
507
+ bytesRead = await stream . ReadAsync ( buffer , 0 , buffer . Length , default ) ;
508
+ }
509
+ catch ( Exception )
510
+ {
511
+ if ( cancellationToken . IsCancellationRequested )
512
+ {
513
+ throw new TaskCanceledException ( ) ;
514
+ }
515
+
516
+ throw ;
517
+ }
473
518
}
474
519
475
520
_logger . Verbose ( "Received {Bytes} bytes from network stream." , bytesRead ) ;
0 commit comments