Skip to content

Commit 9e745ba

Browse files
committed
[WIP] NSM datapath monitoring
1 parent db3066c commit 9e745ba

File tree

6 files changed

+234
-2
lines changed

6 files changed

+234
-2
lines changed

cmd/proxy/internal/client/utils.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ package client
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
"github.com/networkservicemesh/api/pkg/api/networkservice"
2324
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
2425
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
2526
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd"
2627
"github.com/nordix/meridio/pkg/nsm"
28+
kernelheal "github.com/nordix/meridio/pkg/nsm/heal"
2729
"google.golang.org/protobuf/types/known/timestamppb"
2830
)
2931

@@ -46,10 +48,16 @@ func newClient(ctx context.Context, name string, nsmAPIClient *nsm.APIClient, ad
4648
sendfd.NewClient(),
4749
)
4850

51+
healOptions := []heal.Option{
52+
heal.WithLivenessCheckInterval(2 * time.Second),
53+
heal.WithLivenessCheckTimeout(1 * time.Second),
54+
heal.WithLivenessCheck(kernelheal.KernelLivenessCheck),
55+
}
56+
4957
return client.NewClient(ctx,
5058
client.WithClientURL(&nsmAPIClient.Config.ConnectTo),
5159
client.WithName(name),
52-
client.WithHealClient(heal.NewClient(ctx)),
60+
client.WithHealClient(heal.NewClient(ctx, healOptions...)),
5361
client.WithAdditionalFunctionality(additionalFunctionality...),
5462
client.WithDialTimeout(nsmAPIClient.Config.DialTimeout),
5563
client.WithDialOptions(nsmAPIClient.GRPCDialOption...),

cmd/tapa/main.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"os"
2626
"os/signal"
2727
"syscall"
28+
"time"
2829

2930
"github.com/go-logr/logr"
3031
"github.com/kelseyhightower/envconfig"
@@ -49,6 +50,7 @@ import (
4950
linuxKernel "github.com/nordix/meridio/pkg/kernel"
5051
"github.com/nordix/meridio/pkg/log"
5152
"github.com/nordix/meridio/pkg/nsm"
53+
kernelheal "github.com/nordix/meridio/pkg/nsm/heal"
5254
"github.com/nordix/meridio/pkg/nsm/interfacename"
5355
"github.com/sirupsen/logrus"
5456
"google.golang.org/grpc"
@@ -155,10 +157,16 @@ func main() {
155157
sendfd.NewClient(),
156158
}
157159

160+
healOptions := []heal.Option{
161+
heal.WithLivenessCheckInterval(2 * time.Second),
162+
heal.WithLivenessCheckTimeout(1 * time.Second),
163+
heal.WithLivenessCheck(kernelheal.KernelLivenessCheck),
164+
}
165+
158166
networkServiceClient := client.NewClient(ctx,
159167
client.WithClientURL(&nsmAPIClient.Config.ConnectTo),
160168
client.WithName(config.Name),
161-
client.WithHealClient(heal.NewClient(ctx)),
169+
client.WithHealClient(heal.NewClient(ctx, healOptions...)),
162170
client.WithAdditionalFunctionality(additionalFunctionality...),
163171
client.WithDialTimeout(nsmAPIClient.Config.DialTimeout),
164172
client.WithDialOptions(nsmAPIClient.GRPCDialOption...),

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/faisal-memon/sviddisk v0.0.0-20211007205134-77ccea0b9271
88
github.com/go-logr/logr v1.4.1
99
github.com/go-logr/zapr v1.3.0
10+
github.com/go-ping/ping v1.0.0
1011
github.com/golang/mock v1.6.0
1112
github.com/google/nftables v0.1.0
1213
github.com/google/uuid v1.3.1

go.sum

+3
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv
106106
github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
107107
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
108108
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
109+
github.com/go-ping/ping v1.0.0 h1:34GZiqLDqqIHEeL5NZIz7jSnMluK7/p0qDB436yO6H0=
110+
github.com/go-ping/ping v1.0.0/go.mod h1:35JbSyV/BYqHwwRA6Zr1uVDm1637YlNOU61wI797NPI=
109111
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
110112
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
111113
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
@@ -412,6 +414,7 @@ golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLL
412414
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
413415
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
414416
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
417+
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
415418
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
416419
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
417420
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=

pkg/nsm/heal/liveness_check.go

+206
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at:
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
// Package heal contains an implementation of LivenessChecker.
18+
package heal
19+
20+
import (
21+
"context"
22+
"errors"
23+
"net"
24+
"strings"
25+
"time"
26+
27+
"github.com/go-ping/ping"
28+
"github.com/networkservicemesh/api/pkg/api/networkservice"
29+
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
30+
"github.com/networkservicemesh/sdk/pkg/tools/log"
31+
)
32+
33+
const (
34+
defaultTimeout = time.Second
35+
packetCount = 4
36+
37+
DatapathSourceIPsKey = "DATAPATH_SOURCE_IPS"
38+
DatapathDestinationIPsKey = "DATAPATH_DESTINATION_IPS"
39+
DatapathIPsSeparator = " "
40+
)
41+
42+
type options struct {
43+
pingerFactory PingerFactory
44+
}
45+
46+
// Option is an option pattern for LivelinessChecker
47+
type Option func(o *options)
48+
49+
// WithPingerFactory - sets any custom pinger factory
50+
func WithPingerFactory(pf PingerFactory) Option {
51+
return func(o *options) {
52+
o.pingerFactory = pf
53+
}
54+
}
55+
56+
// KernelLivenessCheck is an implementation of heal.LivenessCheck
57+
func KernelLivenessCheck(deadlineCtx context.Context, conn *networkservice.Connection) bool {
58+
return KernelLivenessCheckWithOptions(deadlineCtx, conn)
59+
}
60+
61+
// KernelLivenessCheckWithOptions is an implementation with options of heal.LivenessCheck. It sends ICMP
62+
// ping and checks reply. Returns false if didn't get reply.
63+
func KernelLivenessCheckWithOptions(deadlineCtx context.Context, conn *networkservice.Connection, opts ...Option) bool {
64+
// Apply options
65+
o := &options{
66+
pingerFactory: &defaultPingerFactory{},
67+
}
68+
for _, opt := range opts {
69+
opt(o)
70+
}
71+
var pingerFactory = o.pingerFactory
72+
73+
if mechanism := conn.GetMechanism().GetType(); mechanism != kernel.MECHANISM {
74+
log.FromContext(deadlineCtx).Warnf("ping is not supported for mechanism %v", mechanism)
75+
return true
76+
}
77+
78+
sourceIPs, destinationIPs := getSourceDestinationIPs(conn.GetContext().GetExtraContext())
79+
combinationCount := len(sourceIPs) * len(destinationIPs)
80+
if combinationCount == 0 {
81+
log.FromContext(deadlineCtx).Debug("No IP address")
82+
return true
83+
}
84+
85+
deadline, ok := deadlineCtx.Deadline()
86+
if !ok {
87+
deadline = time.Now().Add(defaultTimeout)
88+
}
89+
timeout := time.Until(deadline)
90+
91+
responseCh := make(chan error, combinationCount)
92+
defer close(responseCh)
93+
for _, sourceIP := range sourceIPs {
94+
for _, destinationIP := range destinationIPs {
95+
if (destinationIP.To4() != nil) != (sourceIP.To4() != nil) {
96+
responseCh <- nil
97+
continue
98+
}
99+
100+
go func(srcIP, dstIP string) {
101+
logger := log.FromContext(deadlineCtx).WithField("srcIP", srcIP).WithField("dstIP", dstIP)
102+
pinger := pingerFactory.CreatePinger(srcIP, dstIP, timeout, packetCount)
103+
104+
err := pinger.Run()
105+
if err != nil {
106+
logger.Errorf("Ping failed: %s", err.Error())
107+
responseCh <- err
108+
return
109+
}
110+
111+
if pinger.GetReceivedPackets() == 0 {
112+
err = errors.New("No packets received")
113+
logger.Errorf(err.Error())
114+
responseCh <- err
115+
return
116+
}
117+
responseCh <- nil
118+
}(sourceIP.String(), destinationIP.String())
119+
}
120+
}
121+
122+
// Waiting for all ping results. If at least one fails - return false
123+
return waitForResponses(responseCh)
124+
}
125+
126+
func getSourceDestinationIPs(extraContext map[string]string) ([]net.IP, []net.IP) {
127+
sourceIPs := []net.IP{}
128+
destinationIPs := []net.IP{}
129+
sourceIPsStr := extraContext[DatapathSourceIPsKey]
130+
destinationIPsStr := extraContext[DatapathDestinationIPsKey]
131+
132+
for _, sourceIPStr := range strings.Split(sourceIPsStr, DatapathIPsSeparator) {
133+
sourceIP, _, err := net.ParseCIDR(sourceIPStr)
134+
if err != nil {
135+
continue
136+
}
137+
138+
sourceIPs = append(sourceIPs, sourceIP)
139+
}
140+
141+
for _, destinationIPStr := range strings.Split(destinationIPsStr, DatapathIPsSeparator) {
142+
destinationIP, _, err := net.ParseCIDR(destinationIPStr)
143+
if err != nil {
144+
continue
145+
}
146+
147+
destinationIPs = append(destinationIPs, destinationIP)
148+
}
149+
150+
return sourceIPs, destinationIPs
151+
}
152+
153+
func waitForResponses(responseCh <-chan error) bool {
154+
respCount := cap(responseCh)
155+
success := true
156+
for {
157+
resp, ok := <-responseCh
158+
if !ok {
159+
return false
160+
}
161+
if resp != nil {
162+
success = false
163+
}
164+
respCount--
165+
if respCount == 0 {
166+
return success
167+
}
168+
}
169+
}
170+
171+
// PingerFactory - factory interface for creating pingers
172+
type PingerFactory interface {
173+
CreatePinger(srcIP, dstIP string, timeout time.Duration, count int) Pinger
174+
}
175+
176+
// Pinger - pinger interface
177+
type Pinger interface {
178+
Run() error
179+
GetReceivedPackets() int
180+
}
181+
182+
type defaultPingerFactory struct{}
183+
184+
func (p *defaultPingerFactory) CreatePinger(srcIP, dstIP string, timeout time.Duration, count int) Pinger {
185+
pi := ping.New(dstIP)
186+
pi.Source = srcIP
187+
pi.Timeout = timeout
188+
pi.Count = count
189+
if count != 0 {
190+
pi.Interval = timeout / time.Duration(count)
191+
}
192+
193+
return &defaultPinger{pinger: pi}
194+
}
195+
196+
type defaultPinger struct {
197+
pinger *ping.Pinger
198+
}
199+
200+
func (p *defaultPinger) Run() error {
201+
return p.pinger.Run()
202+
}
203+
204+
func (p *defaultPinger) GetReceivedPackets() int {
205+
return p.pinger.Statistics().PacketsRecv
206+
}

pkg/proxy/proxy.go

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
nspAPI "github.com/nordix/meridio/api/nsp/v1"
3232
"github.com/nordix/meridio/pkg/log"
3333
"github.com/nordix/meridio/pkg/networking"
34+
kernelheal "github.com/nordix/meridio/pkg/nsm/heal"
3435
"github.com/nordix/meridio/pkg/retry"
3536
"github.com/nordix/meridio/pkg/utils"
3637
"github.com/vishvananda/netlink"
@@ -271,6 +272,11 @@ func (p *Proxy) SetIPContext(ctx context.Context, conn *networkservice.Connectio
271272
oldDstIpAddrs := ipContext.DstIpAddrs
272273
ipContext.SrcIpAddrs = dstIpAddrs
273274
ipContext.DstIpAddrs = srcIPAddrs
275+
if conn.GetContext().ExtraContext == nil {
276+
conn.GetContext().ExtraContext = map[string]string{}
277+
}
278+
conn.GetContext().ExtraContext[kernelheal.DatapathSourceIPsKey] = strings.Join(p.Bridge.GetLocalPrefixes(), kernelheal.DatapathIPsSeparator)
279+
conn.GetContext().ExtraContext[kernelheal.DatapathDestinationIPsKey] = strings.Join(ipContext.DstIpAddrs, kernelheal.DatapathIPsSeparator)
274280
// Note: It might be confusing to see all the "release IP" msgs if the
275281
// LB NSE is gone, but NSM Find Client haven't reported it yet in order
276282
// to close the related connection.

0 commit comments

Comments
 (0)