Skip to content

Commit 6a087e3

Browse files
committed
lb; update neighbor cache on TAPA connection close
Monitor TAPA->Proxy NSM connections and update neighbor cache upon connection close events by removing neighbor entries associated with TAPA side IP addresses. Currently, the connection monitor uses a wildcard filter, matching all connections. New stateless-lb container env variables: - NAMESPACE - TARGET_DISCONNECT_MONITORING (enabling the feature by default)
1 parent 351f618 commit 6a087e3

File tree

10 files changed

+431
-40
lines changed

10 files changed

+431
-40
lines changed

cmd/stateless-lb/config.go

+21-19
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,27 @@ import (
2727

2828
// Config for the proxy
2929
type Config struct {
30-
Name string `default:"load-balancer" desc:"Name of the pod"`
31-
ServiceName string `default:"load-balancer" desc:"Name of providing service" split_words:"true"`
32-
ConnectTo url.URL `default:"unix:///var/lib/networkservicemesh/nsm.io.sock" desc:"url to connect to NSM" split_words:"true"`
33-
DialTimeout time.Duration `default:"5s" desc:"timeout to dial NSMgr" split_words:"true"`
34-
RequestTimeout time.Duration `default:"15s" desc:"timeout to request NSE" split_words:"true"`
35-
MaxTokenLifetime time.Duration `default:"24h" desc:"maximum lifetime of tokens" split_words:"true"`
36-
NSPService string `default:"nsp-service:7778" desc:"IP (or domain) and port of the NSP Service" split_words:"true"`
37-
ConduitName string `default:"load-balancer" desc:"Name of the conduit" split_words:"true"`
38-
TrenchName string `default:"default" desc:"Trench the pod is running on" split_words:"true"`
39-
LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"`
40-
Nfqueue string `default:"0:3" desc:"netfilter queue(s) to be used by nfqlb" split_words:"true"`
41-
NfqueueFanout bool `default:"false" desc:"enable fanout nfqueue option" split_words:"true"`
42-
IdentifierOffsetStart int `default:"5000" desc:"Each Stream will get a unique identifier range starting from that value" split_words:"true"`
43-
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout" envconfig:"grpc_keepalive_time"`
44-
GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"`
45-
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
46-
MetricsEnabled bool `default:"false" desc:"Enable the metrics collection" split_words:"true"`
47-
MetricsPort int `default:"2223" desc:"Specify the port used to expose the metrics" split_words:"true"`
48-
Socket url.URL `default:"unix:///var/lib/meridio/lb.sock" desc:"Server socket to host Stream Availability Service" split_words:"true"`
30+
Name string `default:"load-balancer" desc:"Name of the pod"`
31+
ServiceName string `default:"load-balancer" desc:"Name of providing service" split_words:"true"`
32+
ConnectTo url.URL `default:"unix:///var/lib/networkservicemesh/nsm.io.sock" desc:"url to connect to NSM" split_words:"true"`
33+
DialTimeout time.Duration `default:"5s" desc:"timeout to dial NSMgr" split_words:"true"`
34+
RequestTimeout time.Duration `default:"15s" desc:"timeout to request NSE" split_words:"true"`
35+
MaxTokenLifetime time.Duration `default:"24h" desc:"maximum lifetime of tokens" split_words:"true"`
36+
NSPService string `default:"nsp-service:7778" desc:"IP (or domain) and port of the NSP Service" split_words:"true"`
37+
ConduitName string `default:"load-balancer" desc:"Name of the conduit" split_words:"true"`
38+
TrenchName string `default:"default" desc:"Trench the pod is running on" split_words:"true"`
39+
LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"`
40+
Nfqueue string `default:"0:3" desc:"netfilter queue(s) to be used by nfqlb" split_words:"true"`
41+
NfqueueFanout bool `default:"false" desc:"enable fanout nfqueue option" split_words:"true"`
42+
IdentifierOffsetStart int `default:"5000" desc:"Each Stream will get a unique identifier range starting from that value" split_words:"true"`
43+
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout" envconfig:"grpc_keepalive_time"`
44+
GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"`
45+
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
46+
MetricsEnabled bool `default:"false" desc:"Enable the metrics collection" split_words:"true"`
47+
MetricsPort int `default:"2223" desc:"Specify the port used to expose the metrics" split_words:"true"`
48+
Socket url.URL `default:"unix:///var/lib/meridio/lb.sock" desc:"Server socket to host Stream Availability Service" split_words:"true"`
49+
Namespace string `default:"default" desc:"Namespace the pod is running on" split_words:"true"`
50+
TargetDisconnectMonitoring bool `default:"true" desc:"Enable listening to Target disconnect events to clean-up linux neighbor cache" split_words:"true"`
4951
}
5052

5153
// IsValid checks if the configuration is valid
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright (c) 2024 OpenInfra Foundation Europe
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package neighborcache
18+
19+
import (
20+
"context"
21+
"net"
22+
23+
"github.com/networkservicemesh/api/pkg/api/networkservice"
24+
"github.com/nordix/meridio/pkg/log"
25+
"github.com/vishvananda/netlink"
26+
)
27+
28+
// RemoveInvalid attempts to remove potentially invalid neighbor entries for
29+
// which NSM has reported that the connection was closed, implying that the
30+
// interface has either disappeared or is about to disappear along with its
31+
// IP and MAC addresses. Thus, even if the same IP address reappears shortly
32+
// due to NSM heal successfully fixing or, more accurately, re-establishing the
33+
// connection, communication disturbances caused by an old invalid neighbor
34+
// cache entry can be avoided, which would have otherwise occurred due to the
35+
// behavior of the neighbor state machine (DELAY state and unicast probes).
36+
// Note: The LB monitors TAPA -> Proxy connections, where the SrcIpAddrs refer
37+
// to TAPA-side IPs, including the ones used as Target IPs by the LB.
38+
func RemoveInvalid(ctx context.Context, connectionEvent *networkservice.ConnectionEvent) {
39+
if connectionEvent.Type != networkservice.ConnectionEventType_DELETE {
40+
return
41+
}
42+
logger := log.FromContextOrGlobal(ctx).WithValues("func", "RemoveInvalid")
43+
// Fetch neighbor cache from kernel
44+
neighborList, err := netlink.NeighList(0, 0)
45+
if err != nil {
46+
logger.Info("Could not fetch neighbor list", "err", err)
47+
return
48+
}
49+
// Convert neighbor list to a map
50+
neighborMap := make(map[string][]netlink.Neigh)
51+
for _, neigh := range neighborList {
52+
ipStr := neigh.IP.String()
53+
neighborMap[ipStr] = append(neighborMap[ipStr], neigh)
54+
}
55+
56+
// Remove any of the NSM SrcIpAddrs from the neighbor cache if they are present
57+
eventPrinted := false
58+
for _, connection := range connectionEvent.Connections {
59+
if connection.GetPath() == nil || len(connection.GetPath().GetPathSegments()) < 1 {
60+
continue
61+
}
62+
if connection.GetContext() == nil || connection.GetContext().GetIpContext() == nil {
63+
continue
64+
}
65+
ipContext := connection.GetContext().GetIpContext()
66+
for _, ipStr := range ipContext.SrcIpAddrs {
67+
if ip, _, err := net.ParseCIDR(ipStr); err == nil {
68+
// Check if neighbor map has an entry for this IP
69+
neighs, ok := neighborMap[ip.String()]
70+
if !ok {
71+
continue
72+
}
73+
if !eventPrinted {
74+
eventPrinted = true
75+
logger.Info("Connection event", "event", connectionEvent)
76+
}
77+
for _, neigh := range neighs {
78+
logger.Info("Delete from neighbor cache", "neigh", neigh, "MAC", neigh.HardwareAddr.String())
79+
err := netlink.NeighDel(&netlink.Neigh{
80+
LinkIndex: neigh.LinkIndex,
81+
IP: ip,
82+
})
83+
if err != nil {
84+
logger.Info("Failed to delete from neighbor cache", "neigh", neigh, "MAC", neigh.HardwareAddr.String(), "err", err)
85+
}
86+
}
87+
}
88+
}
89+
}
90+
}

cmd/stateless-lb/main.go

+55-17
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424
"fmt"
2525
"io"
2626
"net"
27+
"net/url"
2728
"os"
2829
"os/signal"
30+
"strings"
2931
"sync"
3032
"syscall"
3133
"time"
@@ -35,6 +37,7 @@ import (
3537
"github.com/networkservicemesh/api/pkg/api/networkservice"
3638
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
3739
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/noop"
40+
"github.com/networkservicemesh/api/pkg/api/registry"
3841
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms"
3942
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel"
4043
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd"
@@ -43,6 +46,7 @@ import (
4346
nsmlog "github.com/networkservicemesh/sdk/pkg/tools/log"
4447
lbAPI "github.com/nordix/meridio/api/loadbalancer/v1"
4548
nspAPI "github.com/nordix/meridio/api/nsp/v1"
49+
"github.com/nordix/meridio/cmd/stateless-lb/internal/neighborcache"
4650
"github.com/nordix/meridio/pkg/debug"
4751
"github.com/nordix/meridio/pkg/endpoint"
4852
"github.com/nordix/meridio/pkg/health"
@@ -139,13 +143,13 @@ func main() {
139143

140144
netUtils := &linuxKernel.KernelUtils{}
141145

142-
// create and start health server
146+
// Create and start health server
143147
ctx = health.CreateChecker(ctx)
144148
if err := health.RegisterReadinessSubservices(ctx, health.LBReadinessServices...); err != nil {
145149
logger.Error(err, "RegisterReadinessSubservices")
146150
}
147-
// note: NSM endpoint service is hosted from early on by its server, thus it can be probed
148-
// irrespective of its registration status at NSM
151+
// Note: The NSM endpoint service is hosted from the start by its server,
152+
// so it can be probed regardless of its registration status at NSM.
149153
if err := health.RegisterLivenessSubservices(ctx, health.LBLivenessServices...); err != nil {
150154
logger.Error(err, "RegisterLivenessSubservices")
151155
}
@@ -175,12 +179,12 @@ func main() {
175179
}
176180
defer conn.Close()
177181

178-
// monitor status of NSP connection and adjust probe status accordingly
182+
// Monitor status of NSP connection and adjust probe status accordingly
179183
if err := connection.Monitor(ctx, health.NSPCliSvc, conn); err != nil {
180184
logger.Error(err, "NSP connection state monitor")
181185
}
182186

183-
stream.SetInterfaceNamePrefix(config.ServiceName) // deduce the NSM interfacename prefix for the netfilter defrag rules
187+
stream.SetInterfaceNamePrefix(config.ServiceName) // Determine the NSM interface name prefix for the netfilter defragmentation rules
184188
targetRegistryClient := nspAPI.NewTargetRegistryClient(conn)
185189
configurationManagerClient := nspAPI.NewConfigurationManagerClient(conn)
186190
conduit := &nspAPI.Conduit{
@@ -225,7 +229,7 @@ func main() {
225229
return
226230
}
227231

228-
// start server to host Stream Forwarding Availability service
232+
// Start server to host Stream Forwarding Availability service
229233
lis, err := createStreamAvailabilityListener(config)
230234
if err != nil {
231235
logger.Error(err, "createStreamAvailabilityListener")
@@ -239,7 +243,7 @@ func main() {
239243
}),
240244
)
241245
defer func() {
242-
// attempt graceful shutdown to allow sending out pending msgs
246+
// Attempt graceful shutdown to allow sending out pending msgs
243247
stopped := make(chan struct{})
244248
go func() {
245249
s.GracefulStop()
@@ -248,7 +252,7 @@ func main() {
248252
waitTimer := time.NewTimer(time.Second)
249253
select {
250254
case <-waitTimer.C:
251-
s.Stop() // graceful shutdown not finished in time, force stop immediately
255+
s.Stop() // Graceful shutdown not finished in time, force stop immediately
252256
case <-stopped:
253257
waitTimer.Stop()
254258
select {
@@ -257,7 +261,8 @@ func main() {
257261
}
258262
}
259263
}()
260-
// announces forwarding availability of streams (i.e. if the LB can forward traffic towards application targets)
264+
// Announces the forwarding availability of streams
265+
// (i.e., whether the LB can forward traffic towards application targets)
261266
streamFwdAvailabilityService := stream.NewForwardingAvailabilityService(
262267
context.Background(),
263268
&lbAPI.Target{
@@ -280,8 +285,8 @@ func main() {
280285
configurationManagerClient,
281286
conduit,
282287
netUtils,
283-
lbFactory, // to spawn nfqlb instance for each Stream created
284-
nfa, // netfilter kernel configuration to steer VIP traffic to nfqlb process
288+
lbFactory, // To spawn nfqlb instance for each Stream created
289+
nfa, // Netfilter kernel configuration to steer VIP traffic to nfqlb process
285290
config.IdentifierOffsetStart,
286291
targetHitsMetrics,
287292
neighborMonitor,
@@ -290,7 +295,7 @@ func main() {
290295

291296
interfaceMonitorEndpoint := interfacemonitor.NewServer(interfaceMonitor, sns, netUtils)
292297

293-
// Note: naming the interface is left to NSM (refer to getNameFromConnection())
298+
// Note: Naming the interface is left to NSM (refer to getNameFromConnection())
294299
// However NSM does not seem to ensure uniqueness either. Might need to revisit...
295300
responderEndpoint := []networkservice.NetworkServiceServer{
296301
mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{
@@ -310,10 +315,10 @@ func main() {
310315
MaxTokenLifetime: config.MaxTokenLifetime,
311316
GRPCMaxBackoff: config.GRPCMaxBackoff,
312317
}
313-
nsmAPIClient := nsm.NewAPIClient(context.Background(), apiClientConfig) // background context to allow endpoint unregistration on tear down
318+
nsmAPIClient := nsm.NewAPIClient(context.Background(), apiClientConfig) // Background context to allow endpoint unregistration on tear down
314319
defer nsmAPIClient.Delete()
315320

316-
// connect NSMgr and start NSM connection monitoring (to log events of interest)
321+
// Connect local NSMgr
317322
cc, err := grpc.DialContext(ctx,
318323
grpcutils.URLToTarget(&nsmAPIClient.Config.ConnectTo),
319324
nsmAPIClient.GRPCDialOption...,
@@ -324,8 +329,13 @@ func main() {
324329
return
325330
}
326331
defer cc.Close()
332+
// Start monitoring NSM connections the LB is part of
327333
monitorClient := networkservice.NewMonitorConnectionClient(cc)
328334
go nsmmonitor.ConnectionMonitor(ctx, config.Name, monitorClient)
335+
// Start cluster-wide monitoring of NSM connection Delete events between TAPA and Proxy
336+
if config.TargetDisconnectMonitoring {
337+
go startClusterConnectionMonitor(ctx, config, cc, nsmAPIClient.GRPCDialOption)
338+
}
329339

330340
endpointConfig := &endpoint.Config{
331341
Name: config.Name,
@@ -334,7 +344,7 @@ func main() {
334344
MaxTokenLifetime: config.MaxTokenLifetime,
335345
}
336346
ep, err := endpoint.NewEndpoint(
337-
context.Background(), // use background context to allow endpoint unregistration on tear down
347+
context.Background(), // Use background context to allow endpoint unregistration on tear down
338348
endpointConfig,
339349
nsmAPIClient.NetworkServiceRegistryClient,
340350
nsmAPIClient.NetworkServiceEndpointRegistryClient,
@@ -346,7 +356,7 @@ func main() {
346356
defer func() {
347357
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3))
348358
defer cancel()
349-
ep.Delete(ctx) // let endpoint unregister with NSM to inform proxies in time
359+
ep.Delete(ctx) // Let endpoint unregister with NSM to inform proxies in time
350360
logger.Info("LB endpoint deleted")
351361
}()
352362

@@ -367,7 +377,7 @@ func main() {
367377
cancel()
368378
}()
369379
sns.Start()
370-
// monitor availibilty of frontends (advertise NSE to proxies only if there's feasible FE)
380+
// Monitor availibilty of frontends (advertise NSE to proxies only if there's feasible FE)
371381
fns := NewFrontendNetworkService(ctx, targetRegistryClient, ep, NewServiceControlDispatcher(sns))
372382
go func() {
373383
logger.Info("Start frontend monitoring service")
@@ -938,3 +948,31 @@ func (sns *SimpleNetworkService) updateVips(vips []*nspAPI.Vip) error {
938948
}
939949
return nil
940950
}
951+
952+
// startClusterConnectionMonitor starts cluster-wide monitoring of NSM connection
953+
// Delete events between TAPA and Proxy, removing invalid Target entries from
954+
// the Linux neighbor cache to prevent connection disturbances.
955+
func startClusterConnectionMonitor(
956+
ctx context.Context,
957+
config Config,
958+
cc grpc.ClientConnInterface,
959+
dialOptions []grpc.DialOption,
960+
) {
961+
nsmmonitor.ClusterConnectionMonitor(
962+
ctx,
963+
// Create a registry client to learn NSMgr URLs
964+
registry.NewNetworkServiceEndpointRegistryClient(cc),
965+
// Use provided gRPC dial options to connect NSMgrs
966+
dialOptions,
967+
&networkservice.MonitorScopeSelector{},
968+
// Replace "local" URLs with the local NSMgr's URL to avoid connecting the local forwarder
969+
func(connectTo *url.URL) *url.URL {
970+
if strings.HasPrefix(connectTo.String(), "inode://") {
971+
return &config.ConnectTo
972+
}
973+
return connectTo
974+
},
975+
// Callback function to remove invalid entries from the neighbor cache
976+
neighborcache.RemoveInvalid,
977+
)
978+
}

config/templates/charts/meridio/deployment/stateless-lb-frontend.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ spec:
115115
value: # to be filled by operator
116116
- name: NSM_METRICS_ENABLED
117117
value: "true"
118+
- name: NSM_NAMESPACE
119+
value: # to be filled by operator
118120
volumeMounts:
119121
- name: spire-agent-socket
120122
mountPath: /run/spire/sockets

deployments/helm/templates/load-balancer.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ spec:
5757
valueFrom:
5858
fieldRef:
5959
fieldPath: metadata.name
60+
- name: NSM_NAMESPACE
61+
valueFrom:
62+
fieldRef:
63+
fieldPath: metadata.namespace
6064
- name: NSM_SERVICE_NAME
6165
value: {{ template "meridio.loadBalancer.networkServiceName" . }}
6266
- name: NSM_CONDUIT_NAME

docs/components/stateless-lb.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ NSM_GRPC_MAX_BACKOFF | time.Duration | Upper bound on gRPC connection backoff de
3838
NSM_METRICS_ENABLED | bool | Enable the metrics collection| false
3939
NSM_METRICS_PORT | int | Specify the port used to expose the metrics | 2223
4040
NSM_SOCKET | url.URL | Server socket to host Stream Availability Service | unix:///var/lib/meridio/lb.sock
41+
NSM_NAMESPACE | string | Namespace the pod is running on | default
42+
NSM_TARGET_DISCONNECT_MONITORING | bool | Enable listenting to Target disconnect events to clean-up linux neighbor cache | true
4143

4244
## Command Line
4345

@@ -91,6 +93,6 @@ Sysctl: net.ipv4.conf.all.rp_filter=0 | Allow packets to have a source IPv4 addr
9193
Sysctl: net.ipv4.conf.default.rp_filter=0 | Allow packets to have a source IPv6 address which does not correspond to any routing destination address.
9294
Sysctl: net.ipv4.fwmark_reflect=1 | Allow LB generated outbound ICMP Frag Needed reply to use VIP as source address.
9395
Sysctl: net.ipv6.fwmark_reflect=1 | Allow LB generated outbound ICMPv6 Packet Too Big reply to use VIP as source address.
94-
NET_ADMIN | The load balancer configures IP rules and IP routes to steer packets (processed by [nfqueue-loadbalancer program](https://github.com/Nordix/nfqueue-loadbalancer)) to targets. The user space load balancer program relies on [libnetfilter_queue](https://netfilter.org/projects/libnetfilter_queue).
96+
NET_ADMIN | The load balancer configures IP rules and IP routes to steer packets (processed by [nfqueue-loadbalancer program](https://github.com/Nordix/nfqueue-loadbalancer)) to targets. The user space load balancer program relies on [libnetfilter_queue](https://netfilter.org/projects/libnetfilter_queue). The load balancer can remove entries from its Linux neighbor cache that correspond to targets for which the MAC address is no longer valid.
9597
IPC_LOCK | The user space load balancer program uses shared memory.
9698
IPC_OWNER | The user space load balancer program uses shared memory.

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/nordix/meridio
33
go 1.22
44

55
require (
6+
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29
67
github.com/edwarnicke/grpcfd v1.1.4
78
github.com/faisal-memon/sviddisk v0.0.0-20211007205134-77ccea0b9271
89
github.com/go-logr/logr v1.4.1
@@ -55,7 +56,6 @@ require (
5556
github.com/cespare/xxhash/v2 v2.2.0 // indirect
5657
github.com/davecgh/go-spew v1.1.1 // indirect
5758
github.com/edwarnicke/exechelper v1.0.2 // indirect
58-
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 // indirect
5959
github.com/edwarnicke/serialize v1.0.7 // indirect
6060
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
6161
github.com/evanphx/json-patch/v5 v5.8.0 // indirect

0 commit comments

Comments
 (0)