Skip to content

Commit efc6069

Browse files
committed
Report cause for Akka/IO TCP CommandFailed events
1 parent cdc2b85 commit efc6069

7 files changed

+90
-28
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3311,6 +3311,11 @@ namespace Akka.IO
33113311
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
33123312
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
33133313
}
3314+
[Akka.Annotations.InternalApiAttribute()]
3315+
public class ConnectException : System.Exception
3316+
{
3317+
public ConnectException(string message) { }
3318+
}
33143319
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
33153320
{
33163321
public static readonly Akka.IO.Dns Instance;
@@ -3525,8 +3530,13 @@ namespace Akka.IO
35253530
public sealed class CommandFailed : Akka.IO.Tcp.Event
35263531
{
35273532
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
3533+
public Akka.Util.Option<System.Exception> Cause { get; }
3534+
[Akka.Annotations.InternalApiAttribute()]
3535+
public string CauseString { get; }
35283536
public Akka.IO.Tcp.Command Cmd { get; }
35293537
public override string ToString() { }
3538+
[Akka.Annotations.InternalApiAttribute()]
3539+
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
35303540
}
35313541
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
35323542
{

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3318,6 +3318,11 @@ namespace Akka.IO
33183318
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
33193319
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
33203320
}
3321+
[Akka.Annotations.InternalApiAttribute()]
3322+
public class ConnectException : System.Exception
3323+
{
3324+
public ConnectException(string message) { }
3325+
}
33213326
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
33223327
{
33233328
public static readonly Akka.IO.Dns Instance;
@@ -3532,8 +3537,13 @@ namespace Akka.IO
35323537
public sealed class CommandFailed : Akka.IO.Tcp.Event
35333538
{
35343539
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
3540+
public Akka.Util.Option<System.Exception> Cause { get; }
3541+
[Akka.Annotations.InternalApiAttribute()]
3542+
public string CauseString { get; }
35353543
public Akka.IO.Tcp.Command Cmd { get; }
35363544
public override string ToString() { }
3545+
[Akka.Annotations.InternalApiAttribute()]
3546+
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
35373547
}
35383548
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
35393549
{

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3311,6 +3311,11 @@ namespace Akka.IO
33113311
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
33123312
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
33133313
}
3314+
[Akka.Annotations.InternalApiAttribute()]
3315+
public class ConnectException : System.Exception
3316+
{
3317+
public ConnectException(string message) { }
3318+
}
33143319
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
33153320
{
33163321
public static readonly Akka.IO.Dns Instance;
@@ -3525,8 +3530,13 @@ namespace Akka.IO
35253530
public sealed class CommandFailed : Akka.IO.Tcp.Event
35263531
{
35273532
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
3533+
public Akka.Util.Option<System.Exception> Cause { get; }
3534+
[Akka.Annotations.InternalApiAttribute()]
3535+
public string CauseString { get; }
35283536
public Akka.IO.Tcp.Command Cmd { get; }
35293537
public override string ToString() { }
3538+
[Akka.Annotations.InternalApiAttribute()]
3539+
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
35303540
}
35313541
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
35323542
{

src/core/Akka/IO/Tcp.cs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
using System.Linq;
1212
using System.Net;
1313
using Akka.Actor;
14+
using Akka.Annotations;
1415
using Akka.Configuration;
1516
using Akka.Dispatch;
1617
using Akka.Event;
1718
using Akka.IO.Buffers;
19+
using Akka.Util;
1820

1921
namespace Akka.IO
2022
{
@@ -85,7 +87,7 @@ private SocketConnected() { }
8587
public class Message : INoSerializationVerificationNeeded { }
8688

8789
#region user commands
88-
90+
8991
// COMMANDS
9092
/// <summary>
9193
/// TBD
@@ -733,7 +735,7 @@ public override string ToString() =>
733735
#endregion
734736

735737
#region user events
736-
738+
737739
/// <summary>
738740
/// Common interface for all events generated by the TCP layer actors.
739741
/// </summary>
@@ -808,18 +810,32 @@ public sealed class CommandFailed : Event
808810
/// TBD
809811
/// </summary>
810812
/// <param name="cmd">TBD</param>
811-
public CommandFailed(Command cmd)
812-
{
813-
Cmd = cmd;
814-
}
813+
public CommandFailed(Command cmd) => Cmd = cmd;
815814

816815
/// <summary>
817816
/// TBD
818817
/// </summary>
819818
public Command Cmd { get; }
820819

821-
public override string ToString() =>
822-
$"CommandFailed({Cmd})";
820+
/// <summary>
821+
/// Optionally contains the cause why the command failed.
822+
/// </summary>
823+
public Option<Exception> Cause { get; private set; } = Option<Exception>.None;
824+
825+
/// <summary>
826+
/// Creates a copy of this object with a new cause set.
827+
/// </summary>
828+
[InternalApi]
829+
public CommandFailed WithCause(Exception cause)
830+
{
831+
// Needs to be added with a mutable property for compatibility reasons
832+
return new CommandFailed(Cmd) { Cause = cause };
833+
}
834+
835+
[InternalApi]
836+
public string CauseString => Cause.HasValue ? $" because of {Cause.Value.Message}" : "";
837+
838+
public override string ToString() => $"CommandFailed({Cmd}){CauseString}";
823839
}
824840

825841
/// <summary>

src/core/Akka/IO/TcpConnection.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ enum ConnectionStatus
104104

105105
private IActorRef _watchedActor = Context.System.DeadLetters;
106106

107+
private readonly IOException droppingWriteBecauseWritingIsSuspendedException =
108+
new IOException("Dropping write because writing is suspended");
109+
110+
private readonly IOException droppingWriteBecauseQueueIsFullException =
111+
new IOException("Dropping write because queue is full");
112+
107113
protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option<int> writeCommandsBufferMaxSize)
108114
{
109115
if (socket == null) throw new ArgumentNullException(nameof(socket));
@@ -328,7 +334,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
328334
if (HasStatus(ConnectionStatus.WritingSuspended))
329335
{
330336
if (_traceLogging) Log.Debug("Dropping write because writing is suspended");
331-
Sender.Tell(write.FailureMessage);
337+
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseWritingIsSuspendedException));
332338
}
333339

334340
if (HasStatus(ConnectionStatus.Sending))
@@ -405,7 +411,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
405411
private void DropWrite(ConnectionInfo info, WriteCommand write)
406412
{
407413
if (_traceLogging) Log.Debug("Dropping write because queue is full");
408-
Sender.Tell(write.FailureMessage);
414+
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseQueueIsFullException));
409415
if (info.UseResumeWriting) SetStatus(ConnectionStatus.WritingSuspended);
410416
}
411417

src/core/Akka/IO/TcpListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private Receive Initializing() => message =>
6262
return true;
6363

6464
case Status.Failure fail:
65-
_bindCommander.Tell(_bind.FailureMessage);
65+
_bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause));
6666
_log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress);
6767
Context.Stop(Self);
6868
_binding = false;

src/core/Akka/IO/TcpOutgoingConnection.cs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
using System.Net.Sockets;
1313
using System.Runtime.CompilerServices;
1414
using Akka.Actor;
15+
using Akka.Annotations;
1516
using Akka.Util;
1617

1718
namespace Akka.IO
1819
{
1920
/// <summary>
20-
/// TBD
21+
/// An actor handling the connection state machine for an outgoing connection
22+
/// to be established.
2123
/// </summary>
2224
internal sealed class TcpOutgoingConnection : TcpConnection
2325
{
@@ -26,6 +28,9 @@ internal sealed class TcpOutgoingConnection : TcpConnection
2628

2729
private SocketAsyncEventArgs _connectArgs;
2830

31+
private readonly ConnectException finishConnectNeverReturnedTrueException =
32+
new ConnectException("Could not establish connection because finishConnect never returned true");
33+
2934
public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect)
3035
: base(
3136
tcp,
@@ -61,11 +66,11 @@ private void ReleaseConnectionSocketArgs()
6166
}
6267
}
6368

64-
private void Stop()
69+
private void Stop(Exception cause)
6570
{
6671
ReleaseConnectionSocketArgs();
6772

68-
StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage));
73+
StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage.WithCause(cause)));
6974
}
7075

7176
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -77,30 +82,29 @@ private void ReportConnectFailure(Action thunk)
7782
}
7883
catch (Exception e)
7984
{
80-
Log.Error(e, "Could not establish connection to [{0}].", _connect.RemoteAddress);
81-
Stop();
85+
Log.Debug(e, "Could not establish connection to [{0}] due to {1}", _connect.RemoteAddress, e.Message);
86+
Stop(e);
8287
}
8388
}
8489

8590
protected override void PreStart()
8691
{
8792
ReportConnectFailure(() =>
8893
{
89-
if (_connect.RemoteAddress is DnsEndPoint)
94+
if (_connect.RemoteAddress is DnsEndPoint remoteAddress)
9095
{
91-
var remoteAddress = (DnsEndPoint) _connect.RemoteAddress;
9296
Log.Debug("Resolving {0} before connecting", remoteAddress.Host);
9397
var resolved = Dns.ResolveName(remoteAddress.Host, Context.System, Self);
9498
if (resolved == null)
9599
Become(Resolving(remoteAddress));
96-
else if(resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
100+
else if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
97101
Register(new IPEndPoint(resolved.Ipv4.FirstOrDefault(), remoteAddress.Port), new IPEndPoint(resolved.Ipv6.FirstOrDefault(), remoteAddress.Port));
98102
else // one or the other
99103
Register(new IPEndPoint(resolved.Addr, remoteAddress.Port), null);
100104
}
101-
else if(_connect.RemoteAddress is IPEndPoint)
105+
else if (_connect.RemoteAddress is IPEndPoint point)
102106
{
103-
Register((IPEndPoint)_connect.RemoteAddress, null);
107+
Register(point, null);
104108
}
105109
else throw new NotSupportedException($"Couldn't connect to [{_connect.RemoteAddress}]: only IP and DNS-based endpoints are supported");
106110
});
@@ -123,8 +127,7 @@ private Receive Resolving(DnsEndPoint remoteAddress)
123127
{
124128
return message =>
125129
{
126-
var resolved = message as Dns.Resolved;
127-
if (resolved != null)
130+
if (message is Dns.Resolved resolved)
128131
{
129132
if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // multiple addresses
130133
{
@@ -144,7 +147,6 @@ private Receive Resolving(DnsEndPoint remoteAddress)
144147
};
145148
}
146149

147-
148150
private void Register(IPEndPoint address, IPEndPoint fallbackAddress)
149151
{
150152
ReportConnectFailure(() =>
@@ -165,7 +167,7 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
165167
{
166168
return message =>
167169
{
168-
if (message is IO.Tcp.SocketConnected)
170+
if (message is Tcp.SocketConnected)
169171
{
170172
if (args.SocketError == SocketError.Success)
171173
{
@@ -202,19 +204,27 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
202204
else
203205
{
204206
Log.Debug("Could not establish connection because finishConnect never returned true (consider increasing akka.io.tcp.finish-connect-retries)");
205-
Stop();
207+
Stop(finishConnectNeverReturnedTrueException);
206208
}
207209
return true;
208210
}
209211
if (message is ReceiveTimeout)
210212
{
211213
if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout
212-
Log.Error("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
213-
Stop();
214+
Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
215+
Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired"));
214216
return true;
215217
}
216218
return false;
217219
};
218220
}
219221
}
222+
223+
[InternalApi]
224+
public class ConnectException : Exception
225+
{
226+
public ConnectException(string message)
227+
: base(message)
228+
{ }
229+
}
220230
}

0 commit comments

Comments
 (0)