Skip to content

Commit 35e7161

Browse files
vazoisTalZaccai
andauthored
Support for Multiple Endpoints (#1083)
* expose command line parameter for endpoints * return list of endpoints from hostname value * change endpoint to endpoints in Options.cs * switch back to string and do explicit split during address validation * create multiple server tcp instances to support multiple endpoints * add test for multi endpoints * fix testUtils regression * addloopback test * fix typo * logger exception message instead of stack trace * allow unix and tcp socket use * calculate ip to advertise from endpoint list * remove ip address logging * check that at least one endpoint is TCP for cluster * first round of comments * remove empty entries * add multiple socket test * nit: rename * add better comments * handle dash in addresses from bind optio * add test for checking parsing of multiple addresses * remove unused code * Fix cluster BDN and add ClusterAnnounce options * sigh! add comma separator * remove trim * log config ip --------- Co-authored-by: Tal Zaccai <[email protected]>
1 parent 490b055 commit 35e7161

21 files changed

+312
-199
lines changed

benchmark/BDN.benchmark/Cluster/ClusterContext.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ public void SetupSingleInstance(bool disableSlotVerification = false)
3535
{
3636
QuietMode = true,
3737
EnableCluster = !disableSlotVerification,
38-
EndPoint = new IPEndPoint(IPAddress.Loopback, port),
38+
EndPoints = [new IPEndPoint(IPAddress.Loopback, port)],
3939
CleanClusterConfig = true,
40+
ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port)
4041
};
4142
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
4243
opt.CheckpointDir = "/tmp";

benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ internal sealed class EmbeddedRespServer : GarnetServer
2121
/// <param name="opts">Server options to configure the base GarnetServer instance</param>
2222
/// <param name="loggerFactory">Logger factory to configure the base GarnetServer instance</param>
2323
/// <param name="server">Server network</param>
24-
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, server)
24+
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, server == null ? null : [server])
2525
{
2626
this.garnetServerEmbedded = server;
2727
this.subscribeBroker = opts.DisablePubSub ? null :

libs/cluster/Server/ClusterManager.cs

+12-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,18 @@ public unsafe ClusterManager(ClusterProvider clusterProvider, ILogger logger = n
5050
clusterConfigDevice = deviceFactory.Get(new FileDescriptor(directoryName: "", fileName: "nodes.conf"));
5151
pool = new(1, (int)clusterConfigDevice.SectorSize);
5252

53-
if (opts.EndPoint is not IPEndPoint endpoint)
54-
throw new NotImplementedException("Cluster mode for unix domain sockets has not been implemented.");
53+
IPEndPoint endpoint = null;
54+
foreach (var endPoint in opts.EndPoints)
55+
{
56+
if (endPoint is IPEndPoint _endpoint)
57+
{
58+
endpoint = _endpoint;
59+
break;
60+
}
61+
}
62+
63+
if (endpoint == null)
64+
throw new GarnetException("No valid IPEndPoint found in endPoint list");
5565

5666
var address = clusterProvider.storeWrapper.GetIp();
5767

libs/cluster/Server/ClusterProvider.cs

+15-13
Original file line numberDiff line numberDiff line change
@@ -439,23 +439,25 @@ internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(Stor
439439
/// <returns></returns>
440440
internal bool BumpAndWaitForEpochTransition()
441441
{
442-
var server = storeWrapper.TcpServer;
443442
BumpCurrentEpoch();
444-
while (true)
443+
foreach (var server in storeWrapper.TcpServer)
445444
{
446-
retry:
447-
Thread.Yield();
448-
// Acquire latest bumped epoch
449-
var currentEpoch = GarnetCurrentEpoch;
450-
var sessions = server.ActiveClusterSessions();
451-
foreach (var s in sessions)
445+
while (true)
452446
{
453-
var entryEpoch = s.LocalCurrentEpoch;
454-
// Retry if at least one session has not yet caught up to the current epoch.
455-
if (entryEpoch != 0 && entryEpoch < currentEpoch)
456-
goto retry;
447+
retry:
448+
Thread.Yield();
449+
// Acquire latest bumped epoch
450+
var currentEpoch = GarnetCurrentEpoch;
451+
var sessions = ((GarnetServerTcp)server).ActiveClusterSessions();
452+
foreach (var s in sessions)
453+
{
454+
var entryEpoch = s.LocalCurrentEpoch;
455+
// Retry if at least one session has not yet caught up to the current epoch.
456+
if (entryEpoch != 0 && entryEpoch < currentEpoch)
457+
goto retry;
458+
}
459+
break;
457460
}
458-
break;
459461
}
460462
return true;
461463
}

libs/cluster/Server/Gossip.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,12 @@ public async Task TryMeetAsync(string address, int port, bool acquireLock = true
161161

162162
if (gsn == null)
163163
{
164-
var endpoint = await Format.TryCreateEndpoint(address, port, useForBind: true, logger: logger);
165-
if (endpoint == null)
164+
var endpoints = await Format.TryCreateEndpoint(address, port, tryConnect: true, logger: logger);
165+
if (endpoints == null)
166166
{
167-
logger?.LogError("Could not parse endpoint {address} {port}", address, port);
167+
logger?.LogError("Invalid CLUSTER MEET endpoint!");
168168
}
169-
gsn = new GarnetServerNode(clusterProvider, endpoint, tlsOptions?.TlsClientOptions, logger: logger);
169+
gsn = new GarnetServerNode(clusterProvider, endpoints[0], tlsOptions?.TlsClientOptions, logger: logger);
170170
created = true;
171171
}
172172

libs/cluster/Session/RespClusterBasicCommands.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private bool NetworkClusterMeet(out bool invalidParameters)
168168
return true;
169169
}
170170

171-
logger?.LogTrace("CLUSTER MEET {ipaddressStr} {port}", ipAddress, port);
171+
logger?.LogTrace("CLUSTER MEET");
172172
clusterProvider.clusterManager.RunMeetTask(ipAddress, port);
173173
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
174174
SendAndReset();

libs/common/Format.cs

+70-77
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT license.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Diagnostics.CodeAnalysis;
7+
using System.Linq;
68
using System.Net;
79
using System.Net.Sockets;
810
using System.Threading.Tasks;
@@ -31,116 +33,109 @@ internal static bool IsNullOrWhiteSpace([NotNullWhen(false)] this string s) =>
3133
#pragma warning disable format
3234
public static class Format
3335
{
36+
/// <summary>
37+
/// Parse address list string containing address separated by whitespace
38+
/// </summary>
39+
/// <param name="addressList">Space separated string of IP addresses</param>
40+
/// <param name="port">Endpoint Port</param>
41+
/// <param name="endpoints">List of endpoints generated from the input IPs</param>
42+
/// <param name="errorHostnameOrAddress">Output error if any</param>
43+
/// <param name="logger">Logger</param>
44+
/// <returns>True if parse and address validation was successful, otherwise false</returns>
45+
public static bool TryParseAddressList(string addressList, int port, out EndPoint[] endpoints, out string errorHostnameOrAddress, ILogger logger = null)
46+
{
47+
endpoints = null;
48+
errorHostnameOrAddress = null;
49+
// Check if input null or empty
50+
if (string.IsNullOrEmpty(addressList) || string.IsNullOrWhiteSpace(addressList))
51+
{
52+
endpoints = [new IPEndPoint(IPAddress.Any, port)];
53+
return true;
54+
}
55+
56+
var addresses = addressList.Split([',',' '], StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
57+
var endpointList = new List<EndPoint>();
58+
// Validate addresses and create endpoints
59+
foreach (var singleAddressOrHostname in addresses)
60+
{
61+
var e = TryCreateEndpoint(singleAddressOrHostname, port, tryConnect: false, logger).Result;
62+
if(e == null)
63+
{
64+
endpoints = null;
65+
errorHostnameOrAddress = singleAddressOrHostname;
66+
return false;
67+
}
68+
endpointList.AddRange(e);
69+
}
70+
endpoints = [.. endpointList];
71+
72+
return true;
73+
}
74+
3475
/// <summary>
3576
/// Try to create an endpoint from address and port
3677
/// </summary>
37-
/// <param name="addressOrHostname">This could be an address or a hostname that the method tries to resolve</param>
38-
/// <param name="port"></param>
39-
/// <param name="useForBind">Binding does not poll connection because is supposed to be called from the server side</param>
40-
/// <param name="logger"></param>
78+
/// <param name="singleAddressOrHostname">This could be an address or a hostname that the method tries to resolve</param>
79+
/// <param name="port">Port number to use for the endpoints</param>
80+
/// <param name="tryConnect">Whether to try to connect to the created endpoints to ensure that it is reachable</param>
81+
/// <param name="logger">Logger</param>
4182
/// <returns></returns>
42-
public static async Task<EndPoint> TryCreateEndpoint(string addressOrHostname, int port, bool useForBind = false, ILogger logger = null)
83+
public static async Task<EndPoint[]> TryCreateEndpoint(string singleAddressOrHostname, int port, bool tryConnect = false, ILogger logger = null)
4384
{
44-
IPEndPoint endpoint = null;
45-
if (string.IsNullOrEmpty(addressOrHostname) || string.IsNullOrWhiteSpace(addressOrHostname))
46-
return new IPEndPoint(IPAddress.Any, port);
85+
if (string.IsNullOrEmpty(singleAddressOrHostname) || string.IsNullOrWhiteSpace(singleAddressOrHostname))
86+
return [new IPEndPoint(IPAddress.Any, port)];
87+
88+
if (singleAddressOrHostname[0] == '-')
89+
singleAddressOrHostname = singleAddressOrHostname.Substring(1);
4790

48-
if (IPAddress.TryParse(addressOrHostname, out var ipAddress))
49-
return new IPEndPoint(ipAddress, port);
91+
if (singleAddressOrHostname.Equals("localhost", StringComparison.CurrentCultureIgnoreCase))
92+
return [new IPEndPoint(IPAddress.Loopback, port)];
93+
94+
if (IPAddress.TryParse(singleAddressOrHostname, out var ipAddress))
95+
return [new IPEndPoint(ipAddress, port)];
5096

5197
// Sanity check, there should be at least one ip address available
5298
try
5399
{
54-
var ipAddresses = Dns.GetHostAddresses(addressOrHostname);
100+
var ipAddresses = Dns.GetHostAddresses(singleAddressOrHostname);
55101
if (ipAddresses.Length == 0)
56102
{
57-
logger?.LogError("No IP address found for hostname:{hostname}", addressOrHostname);
103+
logger?.LogError("No IP address found for hostname:{hostname}", singleAddressOrHostname);
58104
return null;
59105
}
60106

61-
if (useForBind)
107+
if (tryConnect)
62108
{
63109
foreach (var entry in ipAddresses)
64110
{
65-
endpoint = new IPEndPoint(entry, port);
66-
var IsListening = await IsReachable(endpoint);
67-
if (IsListening) break;
111+
var endpoint = new IPEndPoint(entry, port);
112+
var IsListening = await TryConnect(endpoint);
113+
if (IsListening) return [endpoint];
68114
}
69115
}
70116
else
71117
{
72118
var machineHostname = GetHostName();
73119

74-
// Hostname does match the one acquired from machine name
75-
if (!addressOrHostname.Equals(machineHostname, StringComparison.OrdinalIgnoreCase))
120+
// User-provided hostname does not match the machine hostname
121+
if (!singleAddressOrHostname.Equals(machineHostname, StringComparison.OrdinalIgnoreCase))
76122
{
77-
logger?.LogError("Provided hostname does not much acquired machine name {addressOrHostname} {machineHostname}!", addressOrHostname, machineHostname);
78-
return null;
79-
}
80-
81-
if (ipAddresses.Length > 1) {
82-
logger?.LogError("Error hostname resolved to multiple endpoints. Garnet does not support multiple endpoints!");
123+
logger?.LogError("Provided hostname does not much acquired machine name {addressOrHostname} {machineHostname}!", singleAddressOrHostname, machineHostname);
83124
return null;
84125
}
85126

86-
return new IPEndPoint(ipAddresses[0], port);
127+
return ipAddresses.Select(ip => new IPEndPoint(ip, port)).ToArray();
87128
}
88-
logger?.LogError("No reachable IP address found for hostname:{hostname}", addressOrHostname);
129+
logger?.LogError("No reachable IP address found for hostname:{hostname}", singleAddressOrHostname);
89130
}
90131
catch (Exception ex)
91132
{
92-
logger?.LogError(ex, "Error while trying to resolve hostname:{hostname}", addressOrHostname);
93-
}
94-
95-
return endpoint;
96-
97-
async Task<bool> IsReachable(IPEndPoint endpoint)
98-
{
99-
using (var tcpClient = new TcpClient())
100-
{
101-
try
102-
{
103-
await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
104-
logger?.LogTrace("Reachable {ip} {port}", endpoint.Address, endpoint.Port);
105-
return true;
106-
}
107-
catch
108-
{
109-
logger?.LogTrace("Unreachable {ip} {port}", endpoint.Address, endpoint.Port);
110-
return false;
111-
}
112-
}
133+
logger?.LogError("Error while trying to resolve hostname: {exMessage} [{hostname}]", ex.Message, singleAddressOrHostname);
113134
}
114-
}
115135

116-
/// <summary>
117-
/// Try to
118-
/// </summary>
119-
/// <param name="address"></param>
120-
/// <param name="port"></param>
121-
/// <param name="logger"></param>
122-
/// <returns></returns>
123-
public static async Task<IPEndPoint> TryValidateAndConnectAddress2(string address, int port, ILogger logger = null)
124-
{
125-
IPEndPoint endpoint = null;
126-
if (!IPAddress.TryParse(address, out var ipAddress))
127-
{
128-
// Try to identify reachable IP address from hostname
129-
var hostEntry = Dns.GetHostEntry(address);
130-
foreach (var entry in hostEntry.AddressList)
131-
{
132-
endpoint = new IPEndPoint(entry, port);
133-
var IsListening = await IsReachable(endpoint);
134-
if (IsListening) break;
135-
}
136-
}
137-
else
138-
{
139-
// If address is valid create endpoint
140-
endpoint = new IPEndPoint(ipAddress, port);
141-
}
136+
return null;
142137

143-
async Task<bool> IsReachable(IPEndPoint endpoint)
138+
async Task<bool> TryConnect(IPEndPoint endpoint)
144139
{
145140
using (var tcpClient = new TcpClient())
146141
{
@@ -157,8 +152,6 @@ async Task<bool> IsReachable(IPEndPoint endpoint)
157152
}
158153
}
159154
}
160-
161-
return endpoint;
162155
}
163156

164157
/// <summary>

libs/host/Configuration/Options.cs

+23-14
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,17 @@ internal sealed class Options
4040
public int Port { get; set; }
4141

4242
[IpAddressValidation(false)]
43-
[Option("bind", Required = false, HelpText = "IP address to bind server to (default: any)")]
43+
[Option("bind", Required = false, HelpText = "Whitespace or comma separated string of IP addresses to bind server to (default: any)")]
4444
public string Address { get; set; }
4545

46+
[IntRangeValidation(0, 65535)]
47+
[Option("cluster-announce-port", Required = false, HelpText = "Port that this node advertises to other nodes to connect to for gossiping.")]
48+
public int ClusterAnnouncePort { get; set; }
49+
50+
[IpAddressValidation(false)]
51+
[Option("cluster-announce-ip", Required = false, HelpText = "IP address that this node advertises to other nodes to connect to for gossiping.")]
52+
public string ClusterAnnounceIp { get; set; }
53+
4654
[MemorySizeValidation]
4755
[Option('m', "memory", Required = false, HelpText = "Total log memory used in bytes (rounds down to power of 2)")]
4856
public string MemorySize { get; set; }
@@ -617,9 +625,6 @@ public bool IsValid(out List<string> invalidOptions, ILogger logger = null)
617625
this.runtimeLogger = logger;
618626
foreach (var prop in typeof(Options).GetProperties())
619627
{
620-
if (prop.Name.Equals("runtimeLogger"))
621-
continue;
622-
623628
// Ignore if property is not decorated with the OptionsAttribute or the ValidationAttribute
624629
var validationAttr = prop.GetCustomAttributes(typeof(ValidationAttribute)).FirstOrDefault();
625630
if (!Attribute.IsDefined(prop, typeof(OptionAttribute)) || validationAttr == null)
@@ -659,18 +664,21 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
659664
var checkpointDir = CheckpointDir;
660665
if (!useAzureStorage) checkpointDir = new DirectoryInfo(string.IsNullOrEmpty(checkpointDir) ? (string.IsNullOrEmpty(logDir) ? "." : logDir) : checkpointDir).FullName;
661666

662-
EndPoint endpoint;
663-
if (!string.IsNullOrEmpty(UnixSocketPath))
664-
{
665-
endpoint = new UnixDomainSocketEndPoint(UnixSocketPath);
666-
}
667-
else
667+
if (!Format.TryParseAddressList(Address, Port, out var endpoints, out _) || endpoints.Length == 0)
668+
throw new GarnetException($"Invalid endpoint format {Address} {Port}.");
669+
670+
EndPoint[] clusterAnnounceEndpoint = null;
671+
if (ClusterAnnounceIp != null)
668672
{
669-
endpoint = Format.TryCreateEndpoint(Address, Port, useForBind: false).Result;
670-
if (endpoint == null)
671-
throw new GarnetException($"Invalid endpoint format {Address} {Port}.");
673+
ClusterAnnouncePort = ClusterAnnouncePort == 0 ? Port : ClusterAnnouncePort;
674+
clusterAnnounceEndpoint = Format.TryCreateEndpoint(ClusterAnnounceIp, ClusterAnnouncePort, tryConnect: false, logger: logger).GetAwaiter().GetResult();
675+
if (clusterAnnounceEndpoint == null || !endpoints.Any(endpoint => endpoint.Equals(clusterAnnounceEndpoint[0])))
676+
throw new GarnetException("Cluster announce endpoint does not match list of listen endpoints provided!");
672677
}
673678

679+
if (!string.IsNullOrEmpty(UnixSocketPath))
680+
endpoints = [.. endpoints, new UnixDomainSocketEndPoint(UnixSocketPath)];
681+
674682
// Unix file permission octal to UnixFileMode
675683
var unixSocketPermissions = (UnixFileMode)Convert.ToInt32(UnixSocketPermission.ToString(), 8);
676684

@@ -723,7 +731,8 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
723731
}
724732
return new GarnetServerOptions(logger)
725733
{
726-
EndPoint = endpoint,
734+
EndPoints = endpoints,
735+
ClusterAnnounceEndpoint = clusterAnnounceEndpoint?[0],
727736
MemorySize = MemorySize,
728737
PageSize = PageSize,
729738
SegmentSize = SegmentSize,

0 commit comments

Comments
 (0)