Skip to content

Commit b58f4ff

Browse files
committed
fix: alloc from cache may over the cap limit
Signed-off-by: l1b0k <[email protected]>
1 parent 8ac4f62 commit b58f4ff

File tree

2 files changed

+229
-32
lines changed

2 files changed

+229
-32
lines changed

pkg/eni/local.go

+69-32
Original file line numberDiff line numberDiff line change
@@ -404,14 +404,15 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
404404
expectV4 := 0
405405
expectV6 := 0
406406

407+
var ipv4, ipv6 *IP
407408
if l.enableIPv4 {
408409
if localIPRequest.NoCache {
409410
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
410411
return nil, []Trace{{Condition: Full}}
411412
}
412413
expectV4 = 1
413414
} else {
414-
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
415+
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
415416
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
416417
return nil, []Trace{{Condition: Full}}
417418
} else if ipv4 == nil {
@@ -427,7 +428,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
427428
}
428429
expectV6 = 1
429430
} else {
430-
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
431+
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
431432
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
432433
return nil, []Trace{{Condition: Full}}
433434
} else if ipv6 == nil {
@@ -441,6 +442,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
441442
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
442443
}
443444

445+
ok1 := l.enableIPv4 && ipv4 != nil || !l.enableIPv4
446+
ok2 := l.enableIPv6 && ipv6 != nil || !l.enableIPv6
447+
448+
if ok1 && ok2 {
449+
// direct return
450+
respCh := make(chan *AllocResp)
451+
// assign ip to pod , as we are ready
452+
// this must be protected by lock
453+
if ipv4 != nil {
454+
ipv4.Allocate(cni.PodID)
455+
}
456+
if ipv6 != nil {
457+
ipv6.Allocate(cni.PodID)
458+
}
459+
460+
go func() {
461+
l.cond.L.Lock()
462+
defer l.cond.L.Unlock()
463+
464+
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
465+
}()
466+
return respCh, nil
467+
}
468+
444469
for i := 0; i < expectV4; i++ {
445470
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
446471
}
@@ -591,9 +616,6 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
591616
default:
592617
}
593618

594-
resp := &AllocResp{}
595-
596-
var ip types.IPSet2
597619
var ipv4, ipv6 *IP
598620
if l.enableIPv4 {
599621
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
@@ -602,42 +624,16 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
602624
l.cond.Wait()
603625
continue
604626
}
605-
ip.IPv4 = ipv4.ip
606627
}
607628
if l.enableIPv6 {
608629
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
609630
if ipv6 == nil {
610631
l.cond.Wait()
611632
continue
612633
}
613-
ip.IPv6 = ipv6.ip
614634
}
615635

616-
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
617-
ENI: *l.eni,
618-
IP: ip,
619-
})
620-
621-
log.Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
622-
623-
select {
624-
case <-ctx.Done():
625-
continue
626-
case respCh <- resp:
627-
// mark the ip as allocated
628-
if ipv4 != nil {
629-
ipv4.Allocate(cni.PodID)
630-
if cni.PodID != "" {
631-
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
632-
}
633-
}
634-
if ipv6 != nil {
635-
ipv6.Allocate(cni.PodID)
636-
if cni.PodID != "" {
637-
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
638-
}
639-
}
640-
}
636+
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
641637

642638
return
643639
}
@@ -1045,6 +1041,47 @@ func (l *Local) Status() Status {
10451041
return s
10461042
}
10471043

1044+
// commit send the allocated ip result to respCh
1045+
// if ctx canceled, the respCh will be closed
1046+
func (l *Local) commit(ctx context.Context, respCh chan *AllocResp, ipv4, ipv6 *IP, podID string) {
1047+
var ip types.IPSet2
1048+
if ipv4 != nil {
1049+
ip.IPv4 = ipv4.ip
1050+
ipv4.Allocate(podID)
1051+
if podID != "" {
1052+
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
1053+
}
1054+
}
1055+
if ipv6 != nil {
1056+
ip.IPv6 = ipv6.ip
1057+
ipv6.Allocate(podID)
1058+
if podID != "" {
1059+
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
1060+
}
1061+
}
1062+
resp := &AllocResp{}
1063+
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
1064+
ENI: *l.eni,
1065+
IP: ip,
1066+
})
1067+
select {
1068+
case <-ctx.Done():
1069+
if ipv4 != nil {
1070+
ipv4.Release(podID)
1071+
}
1072+
if ipv6 != nil {
1073+
ipv6.Release(podID)
1074+
}
1075+
1076+
// parent cancel the context, so close the ch
1077+
close(respCh)
1078+
1079+
return
1080+
case respCh <- resp:
1081+
logr.FromContextOrDiscard(ctx).Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
1082+
}
1083+
}
1084+
10481085
// syncIPLocked will mark ip as invalid , if not found in remote
10491086
func syncIPLocked(lo Set, remote []netip.Addr) {
10501087
s := sets.New[netip.Addr](remote...)

pkg/eni/local_test.go

+160
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"time"
1010

1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
1213
"golang.org/x/time/rate"
1314
"k8s.io/apimachinery/pkg/util/cache"
1415
"k8s.io/apimachinery/pkg/util/sets"
16+
"k8s.io/apimachinery/pkg/util/wait"
1517

1618
"github.com/AliyunContainerService/terway/pkg/factory"
1719
factorymocks "github.com/AliyunContainerService/terway/pkg/factory/mocks"
@@ -571,3 +573,161 @@ func TestAllocFromFactory(t *testing.T) {
571573
assert.Equal(t, req1, local.allocatingV4[0])
572574
assert.Equal(t, req1, local.allocatingV6[0])
573575
}
576+
577+
func Test_factoryDisposeWorker_unAssignIP(t *testing.T) {
578+
f := factorymocks.NewFactory(t)
579+
// even we have two jobs ,we only get one ip
580+
f.On("UnAssignNIPv4", "eni-1", []netip.Addr{netip.MustParseAddr("192.0.2.1")}, mock.Anything).Return(nil).Once()
581+
f.On("UnAssignNIPv6", "eni-1", []netip.Addr{netip.MustParseAddr("fd00::1")}, mock.Anything).Return(nil).Once()
582+
583+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
584+
EnableIPv4: true,
585+
EnableIPv6: true,
586+
BatchSize: 10,
587+
}, "")
588+
local.status = statusInUse
589+
590+
local.ipv4.Add(&IP{
591+
ip: netip.MustParseAddr("192.0.2.1"),
592+
primary: false,
593+
podID: "",
594+
status: ipStatusDeleting,
595+
})
596+
597+
local.ipv4.Add(&IP{
598+
ip: netip.MustParseAddr("192.0.2.2"),
599+
primary: false,
600+
podID: "",
601+
status: ipStatusValid,
602+
})
603+
604+
local.ipv6.Add(&IP{
605+
ip: netip.MustParseAddr("fd00::1"),
606+
primary: false,
607+
podID: "",
608+
status: ipStatusDeleting,
609+
})
610+
611+
ctx, cancel := context.WithCancel(context.Background())
612+
defer cancel()
613+
go local.factoryDisposeWorker(ctx)
614+
615+
err := wait.ExponentialBackoff(wait.Backoff{
616+
Duration: 100 * time.Millisecond,
617+
Steps: 10,
618+
}, func() (done bool, err error) {
619+
local.cond.L.Lock()
620+
defer local.cond.L.Unlock()
621+
622+
if len(local.ipv6) == 0 && len(local.ipv4) == 1 {
623+
return true, nil
624+
}
625+
return false, nil
626+
})
627+
assert.NoError(t, err)
628+
}
629+
630+
func Test_factoryDisposeWorker_releaseIP(t *testing.T) {
631+
f := factorymocks.NewFactory(t)
632+
// even we have two jobs ,we only get one ip
633+
f.On("DeleteNetworkInterface", "eni-1").Return(nil).Once()
634+
635+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
636+
EnableIPv4: true,
637+
EnableIPv6: true,
638+
BatchSize: 10,
639+
}, "")
640+
local.status = statusDeleting
641+
642+
ctx, cancel := context.WithCancel(context.Background())
643+
defer cancel()
644+
645+
go local.factoryDisposeWorker(ctx)
646+
647+
err := wait.ExponentialBackoff(wait.Backoff{
648+
Duration: 100 * time.Millisecond,
649+
Steps: 10,
650+
}, func() (done bool, err error) {
651+
local.cond.L.Lock()
652+
defer local.cond.L.Unlock()
653+
if local.eni == nil {
654+
return true, nil
655+
}
656+
return false, nil
657+
})
658+
659+
assert.NoError(t, err)
660+
}
661+
662+
func Test_commit_responsed(t *testing.T) {
663+
f := factorymocks.NewFactory(t)
664+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
665+
EnableIPv4: true,
666+
EnableIPv6: true,
667+
BatchSize: 10,
668+
}, "")
669+
local.status = statusInUse
670+
671+
ctx, cancel := context.WithCancel(context.Background())
672+
defer cancel()
673+
674+
respCh := make(chan *AllocResp)
675+
ipv4 := &IP{
676+
ip: netip.MustParseAddr("127.0.0.1"),
677+
primary: false,
678+
podID: "",
679+
status: ipStatusValid,
680+
}
681+
ipv6 := &IP{
682+
ip: netip.MustParseAddr("fd00::1"),
683+
primary: false,
684+
podID: "",
685+
status: ipStatusValid,
686+
}
687+
wg := sync.WaitGroup{}
688+
wg.Add(1)
689+
go func() {
690+
defer wg.Done()
691+
692+
<-respCh
693+
}()
694+
695+
local.commit(ctx, respCh, ipv4, ipv6, "foo")
696+
697+
wg.Wait()
698+
699+
assert.Equal(t, "foo", ipv4.podID)
700+
assert.Equal(t, "foo", ipv6.podID)
701+
}
702+
703+
func Test_commit_canceled(t *testing.T) {
704+
f := factorymocks.NewFactory(t)
705+
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
706+
EnableIPv4: true,
707+
EnableIPv6: true,
708+
BatchSize: 10,
709+
}, "")
710+
local.status = statusInUse
711+
712+
ctx, cancel := context.WithCancel(context.Background())
713+
cancel()
714+
715+
respCh := make(chan *AllocResp)
716+
ipv4 := &IP{
717+
ip: netip.MustParseAddr("127.0.0.1"),
718+
primary: false,
719+
podID: "foo",
720+
status: ipStatusValid,
721+
}
722+
ipv6 := &IP{
723+
ip: netip.MustParseAddr("fd00::1"),
724+
primary: false,
725+
podID: "foo",
726+
status: ipStatusValid,
727+
}
728+
729+
local.commit(ctx, respCh, ipv4, ipv6, "foo")
730+
731+
assert.Equal(t, "", ipv4.podID)
732+
assert.Equal(t, "", ipv6.podID)
733+
}

0 commit comments

Comments
 (0)