Skip to content

Commit 827a979

Browse files
committed
fix: alloc from cache may over the cap limit
Signed-off-by: l1b0k <[email protected]>
1 parent 8ac4f62 commit 827a979

File tree

2 files changed

+206
-32
lines changed

2 files changed

+206
-32
lines changed

pkg/eni/local.go

+69-32
Original file line numberDiff line numberDiff line change
@@ -404,14 +404,15 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
404404
expectV4 := 0
405405
expectV6 := 0
406406

407+
var ipv4, ipv6 *IP
407408
if l.enableIPv4 {
408409
if localIPRequest.NoCache {
409410
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
410411
return nil, []Trace{{Condition: Full}}
411412
}
412413
expectV4 = 1
413414
} else {
414-
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
415+
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
415416
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
416417
return nil, []Trace{{Condition: Full}}
417418
} else if ipv4 == nil {
@@ -427,7 +428,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
427428
}
428429
expectV6 = 1
429430
} else {
430-
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
431+
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
431432
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
432433
return nil, []Trace{{Condition: Full}}
433434
} else if ipv6 == nil {
@@ -441,6 +442,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
441442
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
442443
}
443444

445+
ok1 := l.enableIPv4 && ipv4 != nil || !l.enableIPv4
446+
ok2 := l.enableIPv6 && ipv6 != nil || !l.enableIPv6
447+
448+
if ok1 && ok2 {
449+
// direct return
450+
respCh := make(chan *AllocResp)
451+
// assign ip to pod , as we are ready
452+
// this must be protected by lock
453+
if ipv4 != nil {
454+
ipv4.Allocate(cni.PodID)
455+
}
456+
if ipv6 != nil {
457+
ipv6.Allocate(cni.PodID)
458+
}
459+
460+
go func() {
461+
l.cond.L.Lock()
462+
defer l.cond.L.Unlock()
463+
464+
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
465+
}()
466+
return respCh, nil
467+
}
468+
444469
for i := 0; i < expectV4; i++ {
445470
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
446471
}
@@ -591,9 +616,6 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
591616
default:
592617
}
593618

594-
resp := &AllocResp{}
595-
596-
var ip types.IPSet2
597619
var ipv4, ipv6 *IP
598620
if l.enableIPv4 {
599621
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
@@ -602,42 +624,16 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
602624
l.cond.Wait()
603625
continue
604626
}
605-
ip.IPv4 = ipv4.ip
606627
}
607628
if l.enableIPv6 {
608629
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
609630
if ipv6 == nil {
610631
l.cond.Wait()
611632
continue
612633
}
613-
ip.IPv6 = ipv6.ip
614634
}
615635

616-
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
617-
ENI: *l.eni,
618-
IP: ip,
619-
})
620-
621-
log.Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
622-
623-
select {
624-
case <-ctx.Done():
625-
continue
626-
case respCh <- resp:
627-
// mark the ip as allocated
628-
if ipv4 != nil {
629-
ipv4.Allocate(cni.PodID)
630-
if cni.PodID != "" {
631-
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
632-
}
633-
}
634-
if ipv6 != nil {
635-
ipv6.Allocate(cni.PodID)
636-
if cni.PodID != "" {
637-
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
638-
}
639-
}
640-
}
636+
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
641637

642638
return
643639
}
@@ -1045,6 +1041,47 @@ func (l *Local) Status() Status {
10451041
return s
10461042
}
10471043

1044+
// commit send the allocated ip result to respCh
1045+
// if ctx canceled, the respCh will be closed
1046+
func (l *Local) commit(ctx context.Context, respCh chan *AllocResp, ipv4, ipv6 *IP, podID string) {
1047+
var ip types.IPSet2
1048+
if ipv4 != nil {
1049+
ip.IPv4 = ipv4.ip
1050+
ipv4.Allocate(podID)
1051+
if podID != "" {
1052+
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
1053+
}
1054+
}
1055+
if ipv6 != nil {
1056+
ip.IPv6 = ipv6.ip
1057+
ipv6.Allocate(podID)
1058+
if podID != "" {
1059+
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
1060+
}
1061+
}
1062+
resp := &AllocResp{}
1063+
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
1064+
ENI: *l.eni,
1065+
IP: ip,
1066+
})
1067+
select {
1068+
case <-ctx.Done():
1069+
if ipv4 != nil {
1070+
ipv4.Release(podID)
1071+
}
1072+
if ipv6 != nil {
1073+
ipv6.Release(podID)
1074+
}
1075+
1076+
// parent cancel the context, so close the ch
1077+
close(respCh)
1078+
1079+
return
1080+
case respCh <- resp:
1081+
logr.FromContextOrDiscard(ctx).Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
1082+
}
1083+
}
1084+
10481085
// syncIPLocked will mark ip as invalid , if not found in remote
10491086
func syncIPLocked(lo Set, remote []netip.Addr) {
10501087
s := sets.New[netip.Addr](remote...)

pkg/eni/local_test.go

+137
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
1213
"golang.org/x/time/rate"
1314
"k8s.io/apimachinery/pkg/util/cache"
1415
"k8s.io/apimachinery/pkg/util/sets"
@@ -571,3 +572,139 @@ func TestAllocFromFactory(t *testing.T) {
571572
assert.Equal(t, req1, local.allocatingV4[0])
572573
assert.Equal(t, req1, local.allocatingV6[0])
573574
}
575+
576+
func Test_factoryDisposeWorker_unAssignIP(t *testing.T) {
577+
f := factorymocks.NewFactory(t)
578+
// even we have two jobs ,we only get one ip
579+
f.On("UnAssignNIPv4", "eni-1", []netip.Addr{netip.MustParseAddr("192.0.2.1")}, mock.Anything).Return(nil).Once()
580+
f.On("UnAssignNIPv6", "eni-1", []netip.Addr{netip.MustParseAddr("fd00::1")}, mock.Anything).Return(nil).Once()
581+
582+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
583+
EnableIPv4: true,
584+
EnableIPv6: true,
585+
BatchSize: 10,
586+
}, "")
587+
local.status = statusInUse
588+
589+
local.ipv4.Add(&IP{
590+
ip: netip.MustParseAddr("192.0.2.1"),
591+
primary: false,
592+
podID: "",
593+
status: ipStatusDeleting,
594+
})
595+
596+
local.ipv4.Add(&IP{
597+
ip: netip.MustParseAddr("192.0.2.2"),
598+
primary: false,
599+
podID: "",
600+
status: ipStatusValid,
601+
})
602+
603+
local.ipv6.Add(&IP{
604+
ip: netip.MustParseAddr("fd00::1"),
605+
primary: false,
606+
podID: "",
607+
status: ipStatusDeleting,
608+
})
609+
610+
ctx, cancel := context.WithCancel(context.Background())
611+
defer cancel()
612+
go local.factoryDisposeWorker(ctx)
613+
614+
time.Sleep(1 * time.Second)
615+
assert.Len(t, local.ipv4, 1)
616+
assert.Len(t, local.ipv6, 0)
617+
}
618+
619+
func Test_factoryDisposeWorker_releaseIP(t *testing.T) {
620+
f := factorymocks.NewFactory(t)
621+
// even we have two jobs ,we only get one ip
622+
f.On("DeleteNetworkInterface", "eni-1").Return(nil).Once()
623+
624+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
625+
EnableIPv4: true,
626+
EnableIPv6: true,
627+
BatchSize: 10,
628+
}, "")
629+
local.status = statusDeleting
630+
631+
ctx, cancel := context.WithCancel(context.Background())
632+
defer cancel()
633+
go local.factoryDisposeWorker(ctx)
634+
635+
time.Sleep(1 * time.Second)
636+
assert.Nil(t, local.eni)
637+
}
638+
639+
func Test_commit_responsed(t *testing.T) {
640+
f := factorymocks.NewFactory(t)
641+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
642+
EnableIPv4: true,
643+
EnableIPv6: true,
644+
BatchSize: 10,
645+
}, "")
646+
local.status = statusInUse
647+
648+
ctx, cancel := context.WithCancel(context.Background())
649+
defer cancel()
650+
651+
respCh := make(chan *AllocResp)
652+
ipv4 := &IP{
653+
ip: netip.MustParseAddr("127.0.0.1"),
654+
primary: false,
655+
podID: "",
656+
status: ipStatusValid,
657+
}
658+
ipv6 := &IP{
659+
ip: netip.MustParseAddr("fd00::1"),
660+
primary: false,
661+
podID: "",
662+
status: ipStatusValid,
663+
}
664+
wg := sync.WaitGroup{}
665+
wg.Add(1)
666+
go func() {
667+
defer wg.Done()
668+
669+
<-respCh
670+
}()
671+
672+
local.commit(ctx, respCh, ipv4, ipv6, "foo")
673+
674+
wg.Wait()
675+
676+
assert.Equal(t, "foo", ipv4.podID)
677+
assert.Equal(t, "foo", ipv6.podID)
678+
}
679+
680+
func Test_commit_canceled(t *testing.T) {
681+
f := factorymocks.NewFactory(t)
682+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
683+
EnableIPv4: true,
684+
EnableIPv6: true,
685+
BatchSize: 10,
686+
}, "")
687+
local.status = statusInUse
688+
689+
ctx, cancel := context.WithCancel(context.Background())
690+
cancel()
691+
692+
respCh := make(chan *AllocResp)
693+
ipv4 := &IP{
694+
ip: netip.MustParseAddr("127.0.0.1"),
695+
primary: false,
696+
podID: "foo",
697+
status: ipStatusValid,
698+
}
699+
ipv6 := &IP{
700+
ip: netip.MustParseAddr("fd00::1"),
701+
primary: false,
702+
podID: "foo",
703+
status: ipStatusValid,
704+
}
705+
706+
local.commit(ctx, respCh, ipv4, ipv6, "foo")
707+
708+
assert.Equal(t, "", ipv4.podID)
709+
assert.Equal(t, "", ipv6.podID)
710+
}

0 commit comments

Comments
 (0)