Skip to content
This repository was archived by the owner on Feb 12, 2025. It is now read-only.

Introduce configurable retries #298

Merged
merged 6 commits into from
May 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,35 @@ The maximum length of a message is detailed in many RFCs that can be summarized


## Message send settings

- `retry` - settings related to transmission retry:
- `max` - the maximum number of retries to perform after the first attempt failed (default: `-1` i.e. infinite)
- `backoff` - the backoff approach, `Constant` / `Linear` / `Exponential` / `AwsJitteredExponential` / `PollyJitteredExponential`, used to wait before performing a retry (default: `Constant`)
- `constantBackoff` - settings related to constant backoff ([formula](../src/NLog.Targets.Syslog/Settings/ConstantBackoff.cs)):
- `firstDelayZero` - whether the first retry should be performed immediately (default: `false`)
- `baseDelay` - the number of milliseconds used as the base to compute the interval after which a retry is performed (default: `500`)
- `linearBackoff` - settings related to linear backoff ([formula](../src/NLog.Targets.Syslog/Settings/LinearBackoff.cs)):
- `firstDelayZero` - whether the first retry should be performed immediately (default: `false`)
- `baseDelay` - the number of milliseconds used as the base to compute the interval after which a retry is performed (default: `500`)
- `scaleFactor` - the scale factor used to compute the interval after which a retry is performed (default: `1`)
- `exponentialBackoff` - settings related to exponential backoff ([formula](../src/NLog.Targets.Syslog/Settings/ExponentialBackoff.cs)):
- `firstDelayZero` - whether the first retry should be performed immediately (default: `false`)
- `baseDelay` - the number of milliseconds used as the base to compute the interval after which a retry is performed (default: `500`)
- `scaleFactor` - the scale factor used to compute the interval after which a retry is performed (default: `2`)
- `awsJitteredExponentialBackoff` - settings related to the AWS jittered exponential backoff ([formula](../src/NLog.Targets.Syslog/Settings/AwsJitteredExponentialBackoff.cs)):
- `firstDelayZero` - whether the first retry should be performed immediately (default: `false`)
- `baseDelay` - the number of milliseconds used as the base to compute the interval after which a retry is performed (default: `500`)
- `maxDelay` - the maximum number of milliseconds used to compute the interval after which a retry is performed (default: `1500`)
- `pollyJitteredExponentialBackoff` - settings related to the Polly jittered exponential backoff ([formula](../src/NLog.Targets.Syslog/Settings/PollyJitteredExponentialBackoff.cs)):
- `firstDelayZero` - whether the first retry should be performed immediately (default: `false`)
- `baseDelay` - the number of milliseconds used as the base to compute the interval after which a retry is performed (default: `500`)
- `maxDelay` - the maximum number of milliseconds used to compute the interval after which a retry is performed (default: `60000`)
- `protocol` - `udp` or `tcp` (default: `udp`)
- `udp` - settings related to UDP:
- `server` - IP or hostname of the Syslog server (default: `127.0.0.1`)
- `port` - port the Syslog server is listening on (default: `514`)
- `reconnectInterval` - the time interval, in milliseconds, after which a connection is retried (default: `500`)
- `tcp` - settings related to TCP:
- `server` - IP or hostname of the Syslog server (default: `127.0.0.1`)
- `port` - port the Syslog server is listening on (default: `514`)
- `reconnectInterval` - the time interval, in milliseconds, after which a connection is retried (default: `500`)
- `keepAlive` - settings related to keep-alive:
- `enabled` - whether to use keep-alive or not (default: `true`)
- `retryCount` - the number of unacknowledged keep-alive probes to send before considering the connection dead and terminating it (default: `10`)
Expand Down
122 changes: 93 additions & 29 deletions src/NLog.Targets.Syslog.Schema/NLog.Targets.Syslog.xsd

Large diffs are not rendered by default.

23 changes: 9 additions & 14 deletions src/NLog.Targets.Syslog.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.2.32505.173
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution items", "Solution items", "{37F84F7C-A60C-4FCE-A629-8D6829777961}"
ProjectSection(SolutionItems) = preProject
..\LICENSE = ..\LICENSE
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{94A03BF0-4CCF-4CB6-990D-C2126176F55C}"
ProjectSection(SolutionItems) = preProject
..\.github\PULL_REQUEST_TEMPLATE.md = ..\.github\PULL_REQUEST_TEMPLATE.md
Expand Down Expand Up @@ -41,23 +43,16 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{82394A02-1
..\docs\test-bench.md = ..\docs\test-bench.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution items", "Solution items", "{37F84F7C-A60C-4FCE-A629-8D6829777961}"
ProjectSection(SolutionItems) = preProject
..\.travis.yml = ..\.travis.yml
..\appveyor.yml = ..\appveyor.yml
..\LICENSE = ..\LICENSE
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FakeSyslogServer", "FakeSyslogServer\FakeSyslogServer.csproj", "{38722095-C8E7-496F-8126-AF17790B8A6E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NLog.Targets.Syslog", "NLog.Targets.Syslog\NLog.Targets.Syslog.csproj", "{48481129-C37F-4F9B-82FC-2A467FC6EC81}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NLog.Targets.Syslog.Schema", "NLog.Targets.Syslog.Schema\NLog.Targets.Syslog.Schema.csproj", "{7C0389AC-BC08-4C76-97CA-8FC405BDC53A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestAppWithGUI", "TestAppWithGUI\TestAppWithGUI.csproj", "{FEDBC211-287E-4D64-B111-594D1CB7C54B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestAppWithTUI", "TestAppWithTUI\TestAppWithTUI.csproj", "{1321F38E-8905-49B7-8D9A-4ECDD85253FF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NLog.Targets.Syslog.Schema", "NLog.Targets.Syslog.Schema\NLog.Targets.Syslog.Schema.csproj", "{7C0389AC-BC08-4C76-97CA-8FC405BDC53A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -100,11 +95,11 @@ Global
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{94A03BF0-4CCF-4CB6-990D-C2126176F55C} = {37F84F7C-A60C-4FCE-A629-8D6829777961}
{DE911B4D-5A26-45BF-A041-CEF72BEECAA0} = {94A03BF0-4CCF-4CB6-990D-C2126176F55C}
{F729507C-B49B-48BD-9884-E2F056658A63} = {94A03BF0-4CCF-4CB6-990D-C2126176F55C}
{FE11D99F-A094-42A4-9DA4-EFEFA413F674} = {37F84F7C-A60C-4FCE-A629-8D6829777961}
{3089347D-9E13-4707-A3C6-A7D786BE6369} = {FE11D99F-A094-42A4-9DA4-EFEFA413F674}
{82394A02-1436-428F-B29E-5C3AD4CE597B} = {37F84F7C-A60C-4FCE-A629-8D6829777961}
{DE911B4D-5A26-45BF-A041-CEF72BEECAA0} = {94A03BF0-4CCF-4CB6-990D-C2126176F55C}
{F729507C-B49B-48BD-9884-E2F056658A63} = {94A03BF0-4CCF-4CB6-990D-C2126176F55C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2D2B5C8D-A77F-436A-8836-1D45D2BFA3DD}
Expand Down
7 changes: 7 additions & 0 deletions src/NLog.Targets.Syslog/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ namespace NLog.Targets.Syslog.Extensions
{
internal static class TaskExtensions
{
public static Task Then(this Task task, Func<Task, Task> continuationFunction, CancellationToken token)
{
return task
.Then<Task>(continuationFunction, token)
.Unwrap();
}

public static Task<TResult> Then<TResult>(this Task task, Func<Task, TResult> continuationFunction, CancellationToken token)
{
return task
Expand Down
82 changes: 82 additions & 0 deletions src/NLog.Targets.Syslog/MessageSend/BackoffDelayProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed under the BSD license
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Linq;
using NLog.Targets.Syslog.Settings;
using Polly.Contrib.WaitAndRetry;

namespace NLog.Targets.Syslog.MessageSend
{
internal class BackoffDelayProvider
{
private static readonly Dictionary<BackoffType, Func<RetryConfig, BackoffDelayProvider>> BackoffFactory;
private readonly IEnumerable<TimeSpan> delaysEnumerable;
private IEnumerator<TimeSpan> delaysEnumerator;

static BackoffDelayProvider()
{
BackoffFactory = new Dictionary<BackoffType, Func<RetryConfig, BackoffDelayProvider>>
{
{ BackoffType.Constant, retryConfig => new BackoffDelayProvider(PollyMaxRetries(retryConfig), retryConfig.ConstantBackoff) },
{ BackoffType.Linear, retryConfig => new BackoffDelayProvider(PollyMaxRetries(retryConfig), retryConfig.LinearBackoff) },
{ BackoffType.Exponential, retryConfig => new BackoffDelayProvider(PollyMaxRetries(retryConfig), retryConfig.ExponentialBackoff) },
{ BackoffType.AwsJitteredExponential, retryConfig => new BackoffDelayProvider(PollyMaxRetries(retryConfig), retryConfig.AwsJitteredExponentialBackoff) },
{ BackoffType.PollyJitteredExponential, retryConfig => new BackoffDelayProvider(PollyMaxRetries(retryConfig), retryConfig.PollyJitteredExponentialBackoff) },
};
}

public static BackoffDelayProvider FromConfig(RetryConfig retryConfig)
{
return BackoffFactory[retryConfig.Backoff](retryConfig);
}

private static int PollyMaxRetries(RetryConfig retryConfig)
{
return retryConfig.InfiniteRetries ? int.MaxValue : retryConfig.Max;
}

private BackoffDelayProvider(int maxRetries, ConstantBackoffConfig config)
{
var baseDelay = TimeSpan.FromMilliseconds(config.BaseDelay);
delaysEnumerable = Backoff.ConstantBackoff(baseDelay, maxRetries, config.FirstDelayZero);
}

private BackoffDelayProvider(int maxRetries, LinearBackoffConfig config)
{
var baseDelay = TimeSpan.FromMilliseconds(config.BaseDelay);
delaysEnumerable = Backoff.LinearBackoff(baseDelay, maxRetries, config.ScaleFactor, config.FirstDelayZero);
}

private BackoffDelayProvider(int maxRetries, ExponentialBackoffConfig config)
{
var baseDelay = TimeSpan.FromMilliseconds(config.BaseDelay);
delaysEnumerable = Backoff.ExponentialBackoff(baseDelay, maxRetries, config.ScaleFactor, config.FirstDelayZero);
}

private BackoffDelayProvider(int maxRetries, AwsJitteredExponentialBackoffConfig config)
{
var baseDelay = TimeSpan.FromMilliseconds(config.BaseDelay);
var maxDelay = TimeSpan.FromMilliseconds(config.MaxDelay);
delaysEnumerable = Backoff.AwsDecorrelatedJitterBackoff(baseDelay, maxDelay, maxRetries, null, config.FirstDelayZero);
}

private BackoffDelayProvider(int maxRetries, PollyJitteredExponentialBackoffConfig config)
{
var baseDelay = TimeSpan.FromMilliseconds(config.BaseDelay);
var maxDelay = TimeSpan.FromMilliseconds(config.MaxDelay);
delaysEnumerable = Backoff
.DecorrelatedJitterBackoffV2(baseDelay, maxRetries, null, config.FirstDelayZero)
.Select(x => x < maxDelay ? x : maxDelay);
}

public TimeSpan GetDelay(bool isFirstRetry)
{
if (isFirstRetry)
delaysEnumerator = delaysEnumerable.GetEnumerator();
delaysEnumerator.MoveNext(); // result is not checked to allow infinite retries (Polly.Contrib.WaitAndRetry is limited to int.MaxValue retries)
return delaysEnumerator.Current;
}
}
}
85 changes: 48 additions & 37 deletions src/NLog.Targets.Syslog/MessageSend/MessageTransmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ namespace NLog.Targets.Syslog.MessageSend
internal abstract class MessageTransmitter
{
private static readonly Dictionary<ProtocolType, Func<MessageTransmitterConfig, MessageTransmitter>> TransmitterFactory;
protected static readonly TimeSpan ZeroSeconds = TimeSpan.FromSeconds(0);

private volatile bool neverCalledInit;
private volatile bool isReady;
private readonly TimeSpan newInitDelay;
private readonly Predicate<int> canRetry;
private readonly BackoffDelayProvider delayProvider;

protected string Server { get; }

Expand All @@ -34,8 +33,8 @@ static MessageTransmitter()
{
TransmitterFactory = new Dictionary<ProtocolType, Func<MessageTransmitterConfig, MessageTransmitter>>
{
{ ProtocolType.Udp, messageTransmitterConfig => new Udp(messageTransmitterConfig.Udp) },
{ ProtocolType.Tcp, messageTransmitterConfig => new Tcp(messageTransmitterConfig.Tcp) }
{ ProtocolType.Udp, messageTransmitterConfig => new Udp(messageTransmitterConfig.Udp, messageTransmitterConfig.Retry) },
{ ProtocolType.Tcp, messageTransmitterConfig => new Tcp(messageTransmitterConfig.Tcp, messageTransmitterConfig.Retry) }
};
}

Expand All @@ -44,44 +43,74 @@ public static MessageTransmitter FromConfig(MessageTransmitterConfig messageTran
return TransmitterFactory[messageTransmitterConfig.Protocol](messageTransmitterConfig);
}

protected MessageTransmitter(Layout server, int port, int reconnectInterval)
protected MessageTransmitter(Layout server, int port, RetryConfig retryConfig)
{
neverCalledInit = true;
isReady = false;
newInitDelay = TimeSpan.FromMilliseconds(reconnectInterval);
canRetry = retryNumber => retryConfig.InfiniteRetries || retryNumber < retryConfig.Max;
delayProvider = BackoffDelayProvider.FromConfig(retryConfig);
Server = server?.Render(LogEventInfo.CreateNullEvent());
Port = port;
}

public Task SendMessageAsync(ByteArray message, CancellationToken token)
{
return SendMessageAsync(message, 0, token);
}

public void Dispose()
{
TidyUp();
}

protected abstract Task Init(IPEndPoint ipEndPoint);

protected abstract Task SendAsync(ByteArray message, CancellationToken token);

protected abstract void Terminate();

private Task SendMessageAsync(ByteArray message, int retryNumber, CancellationToken token)
{
if (token.IsCancellationRequested)
return Task.FromResult<object>(null);

return PrepareForSendAsync(token)
return PrepareForSendAsync(retryNumber, token)
.Then(_ => SendAsync(message, token), token)
.Unwrap()
.ContinueWith(t =>
{
var exception = t.Exception;
if (token.IsCancellationRequested || t.IsCanceled || exception == null)
var baseException = t.Exception?.GetBaseException();

// Complete with success: cancellation requested or message sent
if (token.IsCancellationRequested || t.IsCanceled || baseException == null)
return Task.FromResult<object>(null);

InternalLogger.Warn(exception?.GetBaseException(), "[Syslog] SendAsync failed");
InternalLogger.Warn(baseException, "[Syslog] SendAsync failed");
TidyUp();
return SendMessageAsync(message, token); // Failures impact on the log entry queue

// Retry: failures can impact on the log entry queue
if (canRetry(retryNumber))
return SendMessageAsync(message, retryNumber < int.MaxValue ? ++retryNumber : retryNumber, token);

// Complete with failure: message not sent and lost
var tcs = new TaskCompletionSource<object>();
tcs.SetException(baseException);
return tcs.Task;
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current)
.Unwrap();
}

public void Dispose()
private Task PrepareForSendAsync(int retryNumber, CancellationToken token)
{
TidyUp();
}
if (isReady)
return Task.FromResult<object>(null);

protected abstract Task Init();
var delay = retryNumber == 0 ? TimeSpan.Zero : delayProvider.GetDelay(retryNumber == 1);
return Task
.Delay(delay, token)
.Then(_ => Init(GetIpEndPoint()), token)
.Then(_ => isReady = true, token);
}

protected IPEndPoint GetIpEndPoint()
private IPEndPoint GetIpEndPoint()
{
return Dns
.GetHostAddresses(Server)
Expand All @@ -92,24 +121,6 @@ protected IPEndPoint GetIpEndPoint()
.FirstOrDefault();
}

protected abstract Task SendAsync(ByteArray message, CancellationToken token);

protected abstract void Terminate();

private Task PrepareForSendAsync(CancellationToken token)
{
if (isReady)
return Task.FromResult<object>(null);

var delay = neverCalledInit ? ZeroSeconds : newInitDelay;
neverCalledInit = false;
return Task
.Delay(delay, token)
.Then(_ => Init(), token)
.Unwrap()
.Then(_ => isReady = true, token);
}

private void TidyUp()
{
try
Expand Down
9 changes: 4 additions & 5 deletions src/NLog.Targets.Syslog/MessageSend/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
Expand All @@ -28,17 +29,16 @@ internal class Tcp : MessageTransmitter
private TcpClient tcp;
private Stream stream;

public Tcp(TcpConfig tcpConfig) : base(tcpConfig.Server, tcpConfig.Port, tcpConfig.ReconnectInterval)
public Tcp(TcpConfig tcpConfig, RetryConfig retryConfig) : base(tcpConfig.Server, tcpConfig.Port, retryConfig)
{
keepAliveConfig = tcpConfig.KeepAlive;
useTls = tcpConfig.Tls.Enabled;
retrieveClientCertificates = tcpConfig.Tls.RetrieveClientCertificates;
framing = tcpConfig.Framing;
}

protected override Task Init()
protected override Task Init(IPEndPoint ipEndPoint)
{
var ipEndPoint = GetIpEndPoint();
tcp = new TcpClient(ipEndPoint.AddressFamily);
SocketInitialization.DisableAddressSharing(tcp.Client);
SocketInitialization.DiscardPendingDataOnClose(tcp.Client);
Expand All @@ -55,8 +55,7 @@ protected override Task SendAsync(ByteArray message, CancellationToken token)
return Task.FromResult<object>(null);

return HandleFramingAsync(message)
.Then(_ => stream.WriteAsync(message, 0, message.Length, token), token)
.Unwrap();
.Then(_ => stream.WriteAsync(message, 0, message.Length, token), token);
}

protected override void Terminate()
Expand Down
5 changes: 2 additions & 3 deletions src/NLog.Targets.Syslog/MessageSend/Udp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ internal class Udp : MessageTransmitter
{
private UdpClient udp;

public Udp(UdpConfig udpConfig) : base(udpConfig.Server, udpConfig.Port, udpConfig.ReconnectInterval)
public Udp(UdpConfig udpConfig, RetryConfig retryConfig) : base(udpConfig.Server, udpConfig.Port, retryConfig)
{
}

protected override Task Init()
protected override Task Init(IPEndPoint ipEndPoint)
{
var ipEndPoint = GetIpEndPoint();
udp = new UdpClient(ipEndPoint.AddressFamily);
udp.Connect(ipEndPoint);
return Task.FromResult<object>(null);
Expand Down
Loading