Skip to content

Support for Multiple Endpoints #1083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6be5555
expose command line parameter for endpoints
vazois Mar 5, 2025
a4d9738
return list of endpoints from hostname value
vazois Mar 5, 2025
8b10e61
change endpoint to endpoints in Options.cs
vazois Mar 6, 2025
a918210
switch back to string and do explicit split during address validation
vazois Mar 7, 2025
8253767
create multiple server tcp instances to support multiple endpoints
vazois Mar 10, 2025
28190dc
add test for multi endpoints
vazois Mar 10, 2025
07a7f2b
fix testUtils regression
vazois Mar 10, 2025
ace8cbe
addloopback test
vazois Mar 10, 2025
e0517a9
fix typo
vazois Mar 11, 2025
19bfb16
logger exception message instead of stack trace
vazois Mar 11, 2025
9f07183
allow unix and tcp socket use
vazois Mar 12, 2025
c61ef2f
calculate ip to advertise from endpoint list
vazois Mar 13, 2025
85cd375
remove ip address logging
vazois Mar 13, 2025
58da5e4
check that at least one endpoint is TCP for cluster
vazois Mar 18, 2025
a03386b
first round of comments
vazois Mar 20, 2025
87a5481
remove empty entries
vazois Mar 21, 2025
94902ff
add multiple socket test
vazois Mar 21, 2025
cdc5ecb
nit: rename
vazois Mar 21, 2025
f86f10b
add better comments
vazois Mar 21, 2025
92c559e
handle dash in addresses from bind optio
vazois Mar 21, 2025
e0b570c
add test for checking parsing of multiple addresses
vazois Mar 21, 2025
5f488f4
remove unused code
vazois Mar 21, 2025
073cebf
Fix cluster BDN and add ClusterAnnounce options
vazois Mar 26, 2025
ae608cd
sigh! add comma separator
vazois Mar 26, 2025
97e7726
remove trim
vazois Mar 26, 2025
7c0f47d
log config ip
vazois Mar 26, 2025
27554f8
Merge branch 'main' into vazois/multi-endpoints
TalZaccai Mar 27, 2025
05cfc97
Merge branch 'main' into vazois/multi-endpoints
vazois Mar 28, 2025
d6c5fde
Merge branch 'main' into vazois/multi-endpoints
vazois Mar 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benchmark/BDN.benchmark/Cluster/ClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public void SetupSingleInstance(bool disableSlotVerification = false)
{
QuietMode = true,
EnableCluster = !disableSlotVerification,
EndPoint = new IPEndPoint(IPAddress.Loopback, port),
EndPoints = [new IPEndPoint(IPAddress.Loopback, port)],
CleanClusterConfig = true,
ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port)
};
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
opt.CheckpointDir = "/tmp";
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal sealed class EmbeddedRespServer : GarnetServer
/// <param name="opts">Server options to configure the base GarnetServer instance</param>
/// <param name="loggerFactory">Logger factory to configure the base GarnetServer instance</param>
/// <param name="server">Server network</param>
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, server)
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, server == null ? null : [server])
{
this.garnetServerEmbedded = server;
this.subscribeBroker = opts.DisablePubSub ? null :
Expand Down
14 changes: 12 additions & 2 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,18 @@ public unsafe ClusterManager(ClusterProvider clusterProvider, ILogger logger = n
clusterConfigDevice = deviceFactory.Get(new FileDescriptor(directoryName: "", fileName: "nodes.conf"));
pool = new(1, (int)clusterConfigDevice.SectorSize);

if (opts.EndPoint is not IPEndPoint endpoint)
throw new NotImplementedException("Cluster mode for unix domain sockets has not been implemented.");
IPEndPoint endpoint = null;
foreach (var endPoint in opts.EndPoints)
{
if (endPoint is IPEndPoint _endpoint)
{
endpoint = _endpoint;
break;
}
}

if (endpoint == null)
throw new GarnetException("No valid IPEndPoint found in endPoint list");

var address = clusterProvider.storeWrapper.GetIp();

Expand Down
28 changes: 15 additions & 13 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,23 +439,25 @@ internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(Stor
/// <returns></returns>
internal bool BumpAndWaitForEpochTransition()
{
var server = storeWrapper.TcpServer;
BumpCurrentEpoch();
while (true)
foreach (var server in storeWrapper.TcpServer)
{
retry:
Thread.Yield();
// Acquire latest bumped epoch
var currentEpoch = GarnetCurrentEpoch;
var sessions = server.ActiveClusterSessions();
foreach (var s in sessions)
while (true)
{
var entryEpoch = s.LocalCurrentEpoch;
// Retry if at least one session has not yet caught up to the current epoch.
if (entryEpoch != 0 && entryEpoch < currentEpoch)
goto retry;
retry:
Thread.Yield();
// Acquire latest bumped epoch
var currentEpoch = GarnetCurrentEpoch;
var sessions = ((GarnetServerTcp)server).ActiveClusterSessions();
foreach (var s in sessions)
{
var entryEpoch = s.LocalCurrentEpoch;
// Retry if at least one session has not yet caught up to the current epoch.
if (entryEpoch != 0 && entryEpoch < currentEpoch)
goto retry;
}
break;
}
break;
}
return true;
}
Expand Down
8 changes: 4 additions & 4 deletions libs/cluster/Server/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ public async Task TryMeetAsync(string address, int port, bool acquireLock = true

if (gsn == null)
{
var endpoint = await Format.TryCreateEndpoint(address, port, useForBind: true, logger: logger);
if (endpoint == null)
var endpoints = await Format.TryCreateEndpoint(address, port, tryConnect: true, logger: logger);
if (endpoints == null)
{
logger?.LogError("Could not parse endpoint {address} {port}", address, port);
logger?.LogError("Invalid CLUSTER MEET endpoint!");
}
gsn = new GarnetServerNode(clusterProvider, endpoint, tlsOptions?.TlsClientOptions, logger: logger);
gsn = new GarnetServerNode(clusterProvider, endpoints[0], tlsOptions?.TlsClientOptions, logger: logger);
created = true;
}

Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/RespClusterBasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private bool NetworkClusterMeet(out bool invalidParameters)
return true;
}

logger?.LogTrace("CLUSTER MEET {ipaddressStr} {port}", ipAddress, port);
logger?.LogTrace("CLUSTER MEET");
clusterProvider.clusterManager.RunMeetTask(ipAddress, port);
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
Expand Down
147 changes: 70 additions & 77 deletions libs/common/Format.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
Expand Down Expand Up @@ -31,116 +33,109 @@ internal static bool IsNullOrWhiteSpace([NotNullWhen(false)] this string s) =>
#pragma warning disable format
public static class Format
{
/// <summary>
/// Parse address list string containing address separated by whitespace
/// </summary>
/// <param name="addressList">Space separated string of IP addresses</param>
/// <param name="port">Endpoint Port</param>
/// <param name="endpoints">List of endpoints generated from the input IPs</param>
/// <param name="errorHostnameOrAddress">Output error if any</param>
/// <param name="logger">Logger</param>
/// <returns>True if parse and address validation was successful, otherwise false</returns>
public static bool TryParseAddressList(string addressList, int port, out EndPoint[] endpoints, out string errorHostnameOrAddress, ILogger logger = null)
{
endpoints = null;
errorHostnameOrAddress = null;
// Check if input null or empty
if (string.IsNullOrEmpty(addressList) || string.IsNullOrWhiteSpace(addressList))
{
endpoints = [new IPEndPoint(IPAddress.Any, port)];
return true;
}

var addresses = addressList.Split([',',' '], StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
var endpointList = new List<EndPoint>();
// Validate addresses and create endpoints
foreach (var singleAddressOrHostname in addresses)
{
var e = TryCreateEndpoint(singleAddressOrHostname, port, tryConnect: false, logger).Result;
if(e == null)
{
endpoints = null;
errorHostnameOrAddress = singleAddressOrHostname;
return false;
}
endpointList.AddRange(e);
}
endpoints = [.. endpointList];

return true;
}

/// <summary>
/// Try to create an endpoint from address and port
/// </summary>
/// <param name="addressOrHostname">This could be an address or a hostname that the method tries to resolve</param>
/// <param name="port"></param>
/// <param name="useForBind">Binding does not poll connection because is supposed to be called from the server side</param>
/// <param name="logger"></param>
/// <param name="singleAddressOrHostname">This could be an address or a hostname that the method tries to resolve</param>
/// <param name="port">Port number to use for the endpoints</param>
/// <param name="tryConnect">Whether to try to connect to the created endpoints to ensure that it is reachable</param>
/// <param name="logger">Logger</param>
/// <returns></returns>
public static async Task<EndPoint> TryCreateEndpoint(string addressOrHostname, int port, bool useForBind = false, ILogger logger = null)
public static async Task<EndPoint[]> TryCreateEndpoint(string singleAddressOrHostname, int port, bool tryConnect = false, ILogger logger = null)
{
IPEndPoint endpoint = null;
if (string.IsNullOrEmpty(addressOrHostname) || string.IsNullOrWhiteSpace(addressOrHostname))
return new IPEndPoint(IPAddress.Any, port);
if (string.IsNullOrEmpty(singleAddressOrHostname) || string.IsNullOrWhiteSpace(singleAddressOrHostname))
return [new IPEndPoint(IPAddress.Any, port)];

if (singleAddressOrHostname[0] == '-')
singleAddressOrHostname = singleAddressOrHostname.Substring(1);

if (IPAddress.TryParse(addressOrHostname, out var ipAddress))
return new IPEndPoint(ipAddress, port);
if (singleAddressOrHostname.Equals("localhost", StringComparison.CurrentCultureIgnoreCase))
return [new IPEndPoint(IPAddress.Loopback, port)];

if (IPAddress.TryParse(singleAddressOrHostname, out var ipAddress))
return [new IPEndPoint(ipAddress, port)];

// Sanity check, there should be at least one ip address available
try
{
var ipAddresses = Dns.GetHostAddresses(addressOrHostname);
var ipAddresses = Dns.GetHostAddresses(singleAddressOrHostname);
if (ipAddresses.Length == 0)
{
logger?.LogError("No IP address found for hostname:{hostname}", addressOrHostname);
logger?.LogError("No IP address found for hostname:{hostname}", singleAddressOrHostname);
return null;
}

if (useForBind)
if (tryConnect)
{
foreach (var entry in ipAddresses)
{
endpoint = new IPEndPoint(entry, port);
var IsListening = await IsReachable(endpoint);
if (IsListening) break;
var endpoint = new IPEndPoint(entry, port);
var IsListening = await TryConnect(endpoint);
if (IsListening) return [endpoint];
}
}
else
{
var machineHostname = GetHostName();

// Hostname does match the one acquired from machine name
if (!addressOrHostname.Equals(machineHostname, StringComparison.OrdinalIgnoreCase))
// User-provided hostname does not match the machine hostname
if (!singleAddressOrHostname.Equals(machineHostname, StringComparison.OrdinalIgnoreCase))
{
logger?.LogError("Provided hostname does not much acquired machine name {addressOrHostname} {machineHostname}!", addressOrHostname, machineHostname);
return null;
}

if (ipAddresses.Length > 1) {
logger?.LogError("Error hostname resolved to multiple endpoints. Garnet does not support multiple endpoints!");
logger?.LogError("Provided hostname does not much acquired machine name {addressOrHostname} {machineHostname}!", singleAddressOrHostname, machineHostname);
return null;
}

return new IPEndPoint(ipAddresses[0], port);
return ipAddresses.Select(ip => new IPEndPoint(ip, port)).ToArray();
}
logger?.LogError("No reachable IP address found for hostname:{hostname}", addressOrHostname);
logger?.LogError("No reachable IP address found for hostname:{hostname}", singleAddressOrHostname);
}
catch (Exception ex)
{
logger?.LogError(ex, "Error while trying to resolve hostname:{hostname}", addressOrHostname);
}

return endpoint;

async Task<bool> IsReachable(IPEndPoint endpoint)
{
using (var tcpClient = new TcpClient())
{
try
{
await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
logger?.LogTrace("Reachable {ip} {port}", endpoint.Address, endpoint.Port);
return true;
}
catch
{
logger?.LogTrace("Unreachable {ip} {port}", endpoint.Address, endpoint.Port);
return false;
}
}
logger?.LogError("Error while trying to resolve hostname: {exMessage} [{hostname}]", ex.Message, singleAddressOrHostname);
}
}

/// <summary>
/// Try to
/// </summary>
/// <param name="address"></param>
/// <param name="port"></param>
/// <param name="logger"></param>
/// <returns></returns>
public static async Task<IPEndPoint> TryValidateAndConnectAddress2(string address, int port, ILogger logger = null)
{
IPEndPoint endpoint = null;
if (!IPAddress.TryParse(address, out var ipAddress))
{
// Try to identify reachable IP address from hostname
var hostEntry = Dns.GetHostEntry(address);
foreach (var entry in hostEntry.AddressList)
{
endpoint = new IPEndPoint(entry, port);
var IsListening = await IsReachable(endpoint);
if (IsListening) break;
}
}
else
{
// If address is valid create endpoint
endpoint = new IPEndPoint(ipAddress, port);
}
return null;

async Task<bool> IsReachable(IPEndPoint endpoint)
async Task<bool> TryConnect(IPEndPoint endpoint)
{
using (var tcpClient = new TcpClient())
{
Expand All @@ -157,8 +152,6 @@ async Task<bool> IsReachable(IPEndPoint endpoint)
}
}
}

return endpoint;
}

/// <summary>
Expand Down
37 changes: 23 additions & 14 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,17 @@ internal sealed class Options
public int Port { get; set; }

[IpAddressValidation(false)]
[Option("bind", Required = false, HelpText = "IP address to bind server to (default: any)")]
[Option("bind", Required = false, HelpText = "Whitespace or comma separated string of IP addresses to bind server to (default: any)")]
public string Address { get; set; }

[IntRangeValidation(0, 65535)]
[Option("cluster-announce-port", Required = false, HelpText = "Port that this node advertises to other nodes to connect to for gossiping.")]
public int ClusterAnnouncePort { get; set; }

[IpAddressValidation(false)]
[Option("cluster-announce-ip", Required = false, HelpText = "IP address that this node advertises to other nodes to connect to for gossiping.")]
public string ClusterAnnounceIp { get; set; }

[MemorySizeValidation]
[Option('m', "memory", Required = false, HelpText = "Total log memory used in bytes (rounds down to power of 2)")]
public string MemorySize { get; set; }
Expand Down Expand Up @@ -617,9 +625,6 @@ public bool IsValid(out List<string> invalidOptions, ILogger logger = null)
this.runtimeLogger = logger;
foreach (var prop in typeof(Options).GetProperties())
{
if (prop.Name.Equals("runtimeLogger"))
continue;

// Ignore if property is not decorated with the OptionsAttribute or the ValidationAttribute
var validationAttr = prop.GetCustomAttributes(typeof(ValidationAttribute)).FirstOrDefault();
if (!Attribute.IsDefined(prop, typeof(OptionAttribute)) || validationAttr == null)
Expand Down Expand Up @@ -659,18 +664,21 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
var checkpointDir = CheckpointDir;
if (!useAzureStorage) checkpointDir = new DirectoryInfo(string.IsNullOrEmpty(checkpointDir) ? (string.IsNullOrEmpty(logDir) ? "." : logDir) : checkpointDir).FullName;

EndPoint endpoint;
if (!string.IsNullOrEmpty(UnixSocketPath))
{
endpoint = new UnixDomainSocketEndPoint(UnixSocketPath);
}
else
if (!Format.TryParseAddressList(Address, Port, out var endpoints, out _) || endpoints.Length == 0)
throw new GarnetException($"Invalid endpoint format {Address} {Port}.");

EndPoint[] clusterAnnounceEndpoint = null;
if (ClusterAnnounceIp != null)
{
endpoint = Format.TryCreateEndpoint(Address, Port, useForBind: false).Result;
if (endpoint == null)
throw new GarnetException($"Invalid endpoint format {Address} {Port}.");
ClusterAnnouncePort = ClusterAnnouncePort == 0 ? Port : ClusterAnnouncePort;
clusterAnnounceEndpoint = Format.TryCreateEndpoint(ClusterAnnounceIp, ClusterAnnouncePort, tryConnect: false, logger: logger).GetAwaiter().GetResult();
if (clusterAnnounceEndpoint == null || !endpoints.Any(endpoint => endpoint.Equals(clusterAnnounceEndpoint[0])))
throw new GarnetException("Cluster announce endpoint does not match list of listen endpoints provided!");
}

if (!string.IsNullOrEmpty(UnixSocketPath))
endpoints = [.. endpoints, new UnixDomainSocketEndPoint(UnixSocketPath)];

// Unix file permission octal to UnixFileMode
var unixSocketPermissions = (UnixFileMode)Convert.ToInt32(UnixSocketPermission.ToString(), 8);

Expand Down Expand Up @@ -723,7 +731,8 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
}
return new GarnetServerOptions(logger)
{
EndPoint = endpoint,
EndPoints = endpoints,
ClusterAnnounceEndpoint = clusterAnnounceEndpoint?[0],
MemorySize = MemorySize,
PageSize = PageSize,
SegmentSize = SegmentSize,
Expand Down
Loading