Skip to content

Commit 1947216

Browse files
authored
Merge pull request #768 from l1b0k/fix/alloc
fix: alloc problem
2 parents 2030ff5 + b58f4ff commit 1947216

File tree

3 files changed

+277
-33
lines changed

3 files changed

+277
-33
lines changed

pkg/eni/local.go

+115-33
Original file line numberDiff line numberDiff line change
@@ -400,19 +400,19 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
400400
}
401401

402402
log := logr.FromContextOrDiscard(ctx)
403-
log.Info(fmt.Sprintf("local request %v", localIPRequest))
404403

405404
expectV4 := 0
406405
expectV6 := 0
407406

407+
var ipv4, ipv6 *IP
408408
if l.enableIPv4 {
409409
if localIPRequest.NoCache {
410410
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
411411
return nil, []Trace{{Condition: Full}}
412412
}
413413
expectV4 = 1
414414
} else {
415-
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
415+
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
416416
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
417417
return nil, []Trace{{Condition: Full}}
418418
} else if ipv4 == nil {
@@ -428,7 +428,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
428428
}
429429
expectV6 = 1
430430
} else {
431-
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
431+
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
432432
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
433433
return nil, []Trace{{Condition: Full}}
434434
} else if ipv6 == nil {
@@ -442,6 +442,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
442442
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
443443
}
444444

445+
ok1 := l.enableIPv4 && ipv4 != nil || !l.enableIPv4
446+
ok2 := l.enableIPv6 && ipv6 != nil || !l.enableIPv6
447+
448+
if ok1 && ok2 {
449+
// direct return
450+
respCh := make(chan *AllocResp)
451+
// assign ip to pod , as we are ready
452+
// this must be protected by lock
453+
if ipv4 != nil {
454+
ipv4.Allocate(cni.PodID)
455+
}
456+
if ipv6 != nil {
457+
ipv6.Allocate(cni.PodID)
458+
}
459+
460+
go func() {
461+
l.cond.L.Lock()
462+
defer l.cond.L.Unlock()
463+
464+
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
465+
}()
466+
return respCh, nil
467+
}
468+
445469
for i := 0; i < expectV4; i++ {
446470
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
447471
}
@@ -455,6 +479,11 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
455479

456480
go l.allocWorker(ctx, cni, localIPRequest, respCh)
457481

482+
if l.eni == nil {
483+
log.Info("local request", "eni", "", "req", localIPRequest)
484+
} else {
485+
log.Info("local request", "eni", l.eni.ID, "req", localIPRequest)
486+
}
458487
return respCh, nil
459488
}
460489

@@ -528,6 +557,8 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
528557
l.cond.L.Lock()
529558
defer l.cond.L.Unlock()
530559

560+
defer l.cond.Broadcast()
561+
531562
defer func() {
532563
if request == nil {
533564
return
@@ -550,6 +581,32 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
550581
}()
551582

552583
log := logr.FromContextOrDiscard(ctx)
584+
if request != nil && request.NoCache {
585+
// as we want to do preheat, so this ip will not be consumed
586+
// so just hang there , let this ctx done
587+
588+
for {
589+
select {
590+
case <-request.workerCtx.Done():
591+
// work ctx finished (factory cancel it)
592+
593+
select {
594+
case <-ctx.Done():
595+
close(respCh)
596+
case respCh <- &AllocResp{}:
597+
}
598+
599+
return
600+
case <-ctx.Done():
601+
// parent cancel the context, so close the ch
602+
close(respCh)
603+
return
604+
default:
605+
}
606+
l.cond.Wait()
607+
}
608+
}
609+
553610
for {
554611
select {
555612
case <-ctx.Done():
@@ -559,52 +616,24 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
559616
default:
560617
}
561618

562-
resp := &AllocResp{}
563-
564-
var ip types.IPSet2
565619
var ipv4, ipv6 *IP
566620
if l.enableIPv4 {
567621
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
568622
if ipv4 == nil {
623+
log.Info("waiting ipv4")
569624
l.cond.Wait()
570625
continue
571626
}
572-
ip.IPv4 = ipv4.ip
573627
}
574628
if l.enableIPv6 {
575629
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
576630
if ipv6 == nil {
577631
l.cond.Wait()
578632
continue
579633
}
580-
ip.IPv6 = ipv6.ip
581634
}
582635

583-
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
584-
ENI: *l.eni,
585-
IP: ip,
586-
})
587-
588-
log.Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
589-
590-
select {
591-
case <-ctx.Done():
592-
continue
593-
case respCh <- resp:
594-
// mark the ip as allocated
595-
if ipv4 != nil {
596-
ipv4.Allocate(cni.PodID)
597-
if cni.PodID != "" {
598-
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
599-
}
600-
}
601-
if ipv6 != nil {
602-
ipv6.Allocate(cni.PodID)
603-
if cni.PodID != "" {
604-
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
605-
}
606-
}
607-
}
636+
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
608637

609638
return
610639
}
@@ -1012,6 +1041,47 @@ func (l *Local) Status() Status {
10121041
return s
10131042
}
10141043

1044+
// commit send the allocated ip result to respCh
1045+
// if ctx canceled, the respCh will be closed
1046+
func (l *Local) commit(ctx context.Context, respCh chan *AllocResp, ipv4, ipv6 *IP, podID string) {
1047+
var ip types.IPSet2
1048+
if ipv4 != nil {
1049+
ip.IPv4 = ipv4.ip
1050+
ipv4.Allocate(podID)
1051+
if podID != "" {
1052+
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
1053+
}
1054+
}
1055+
if ipv6 != nil {
1056+
ip.IPv6 = ipv6.ip
1057+
ipv6.Allocate(podID)
1058+
if podID != "" {
1059+
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
1060+
}
1061+
}
1062+
resp := &AllocResp{}
1063+
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
1064+
ENI: *l.eni,
1065+
IP: ip,
1066+
})
1067+
select {
1068+
case <-ctx.Done():
1069+
if ipv4 != nil {
1070+
ipv4.Release(podID)
1071+
}
1072+
if ipv6 != nil {
1073+
ipv6.Release(podID)
1074+
}
1075+
1076+
// parent cancel the context, so close the ch
1077+
close(respCh)
1078+
1079+
return
1080+
case respCh <- resp:
1081+
logr.FromContextOrDiscard(ctx).Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
1082+
}
1083+
}
1084+
10151085
// syncIPLocked will mark ip as invalid , if not found in remote
10161086
func syncIPLocked(lo Set, remote []netip.Addr) {
10171087
s := sets.New[netip.Addr](remote...)
@@ -1128,12 +1198,24 @@ func (l *Local) popNIPv4Jobs(count int) {
11281198
firstPart, secondPart := Split(l.allocatingV4, count)
11291199
l.dangingV4 = append(l.dangingV4, firstPart...)
11301200
l.allocatingV4 = secondPart
1201+
1202+
lo.ForEach(l.dangingV4, func(item *LocalIPRequest, index int) {
1203+
if item.NoCache {
1204+
item.cancel()
1205+
}
1206+
})
11311207
}
11321208

11331209
func (l *Local) popNIPv6Jobs(count int) {
11341210
firstPart, secondPart := Split(l.allocatingV6, count)
11351211
l.dangingV6 = append(l.dangingV6, firstPart...)
11361212
l.allocatingV6 = secondPart
1213+
1214+
lo.ForEach(l.dangingV6, func(item *LocalIPRequest, index int) {
1215+
if item.NoCache {
1216+
item.cancel()
1217+
}
1218+
})
11371219
}
11381220

11391221
func Split[T any](arr []T, index int) ([]T, []T) {

0 commit comments

Comments
 (0)