Skip to content

Commit 726e602

Browse files
wongcharajcaldelas
authored andcommitted
implement preferAlignByUncorecache within TakeByTopologyNUMAPacked and
test cases
1 parent a44e5f5 commit 726e602

File tree

6 files changed

+185
-180
lines changed

6 files changed

+185
-180
lines changed

pkg/kubelet/cm/cpumanager/cpu_assignment.go

Lines changed: 50 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,8 @@ func standardDeviation(xs []int) float64 {
9595
type numaOrSocketsFirstFuncs interface {
9696
takeFullFirstLevel()
9797
takeFullSecondLevel()
98-
takeThirdLevel()
9998
sortAvailableNUMANodes() []int
10099
sortAvailableSockets() []int
101-
sortAvailableUncoreCaches() []int
102100
sortAvailableCores() []int
103101
}
104102

@@ -120,18 +118,12 @@ func (n *numaFirst) takeFullSecondLevel() {
120118
n.acc.takeFullSockets()
121119
}
122120

123-
// In Split UncoreCache Topology, we take from the sets of UncoreCache as the third level
124-
func (n *numaFirst) takeThirdLevel() {
125-
n.acc.takeUncoreCache()
126-
}
127-
128-
// If NUMA nodes are higher in the memory hierarchy than sockets, then just
129-
// sort the NUMA nodes directly, and return them.
130-
func (n *numaFirst) sortAvailableUncoreCaches() []int {
121+
// Sort the UncoreCaches within the NUMA nodes.
122+
func (a *cpuAccumulator) sortAvailableUncoreCaches() []int {
131123
var result []int
132-
for _, socket := range n.acc.sortAvailableNUMANodes() {
133-
uncore := n.acc.details.UncoreInNUMANodes(socket).UnsortedList()
134-
n.acc.sort(uncore, n.acc.details.CPUsInUncoreCaches)
124+
for _, numa := range a.sortAvailableNUMANodes() {
125+
uncore := a.details.UncoreInNUMANodes(numa).UnsortedList()
126+
a.sort(uncore, a.details.CPUsInUncoreCaches)
135127
result = append(result, uncore...)
136128
}
137129
return result
@@ -182,10 +174,6 @@ func (s *socketsFirst) takeFullSecondLevel() {
182174
s.acc.takeFullNUMANodes()
183175
}
184176

185-
func (s *socketsFirst) takeThirdLevel() {
186-
s.acc.takeUncoreCache()
187-
}
188-
189177
// If sockets are higher in the memory hierarchy than NUMA nodes, then we need
190178
// to pull the set of NUMA nodes out of each sorted Socket, and accumulate the
191179
// partial order across them.
@@ -207,18 +195,6 @@ func (s *socketsFirst) sortAvailableSockets() []int {
207195
return sockets
208196
}
209197

210-
// If sockets higher in the memory hierarchy than NUMA nodes, then UncoreCache
211-
// sit directly below NUMA Nodes in the memory hierchy
212-
func (s *socketsFirst) sortAvailableUncoreCaches() []int {
213-
var result []int
214-
for _, uncore := range s.acc.sortAvailableNUMANodes() {
215-
uncore := s.acc.details.UncoreInNUMANodes(uncore).UnsortedList()
216-
s.acc.sort(uncore, s.acc.details.CPUsInUncoreCaches)
217-
result = append(result, uncore...)
218-
}
219-
return result
220-
}
221-
222198
// If sockets are higher in the memory hierarchy than NUMA nodes, then cores
223199
// sit directly below NUMA Nodes in the memory hierarchy.
224200
func (s *socketsFirst) sortAvailableCores() []int {
@@ -353,8 +329,8 @@ func (a *cpuAccumulator) isSocketFree(socketID int) bool {
353329
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
354330
}
355331

356-
// Returns true if the supplied UnCoreCache is fully available in `a.details`.
357-
// "fully available" means that all the CPUs in it are free.
332+
// Returns true if the supplied UnCoreCache is fully available,
333+
// meaning all its CPUs are free in `a.details`.
358334
func (a *cpuAccumulator) isUncoreCacheFree(uncoreID int) bool {
359335
return a.details.CPUsInUncoreCaches(uncoreID).Size() == a.topo.CPUDetails.CPUsInUncoreCaches(uncoreID).Size()
360336
}
@@ -390,19 +366,14 @@ func (a *cpuAccumulator) freeSockets() []int {
390366
// Returns free UncoreCache IDs as a slice sorted by sortAvailableUnCoreCache().
391367
func (a *cpuAccumulator) freeUncoreCache() []int {
392368
free := []int{}
393-
for _, uncore := range a.numaOrSocketsFirst.sortAvailableUncoreCaches() {
369+
for _, uncore := range a.sortAvailableUncoreCaches() {
394370
if a.isUncoreCacheFree(uncore) {
395371
free = append(free, uncore)
396372
}
397373
}
398374
return free
399375
}
400376

401-
// Returns all UncoreCache IDs as a slice sorted by sortAvailableUncoreCache().
402-
func (a *cpuAccumulator) allUncoreCache() []int {
403-
return a.numaOrSocketsFirst.sortAvailableUncoreCaches()
404-
}
405-
406377
// Returns free core IDs as a slice sorted by sortAvailableCores().
407378
func (a *cpuAccumulator) freeCores() []int {
408379
free := []int{}
@@ -575,7 +546,7 @@ func (a *cpuAccumulator) takeFullSockets() {
575546
a.take(cpusInSocket)
576547
}
577548
}
578-
func (a *cpuAccumulator) takeFullUnCore() {
549+
func (a *cpuAccumulator) takeFullUncore() {
579550
for _, uncore := range a.freeUncoreCache() {
580551
cpusInUncore := a.topo.CPUDetails.CPUsInUncoreCaches(uncore)
581552
if !a.needsAtLeast(cpusInUncore.Size()) {
@@ -585,30 +556,34 @@ func (a *cpuAccumulator) takeFullUnCore() {
585556
}
586557
}
587558

559+
func (a *cpuAccumulator) takePartialUncore(uncoreID int) {
560+
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()
561+
var freeCPUsInUncoreCache cpuset.CPUSet
562+
freeCoresInUncoreCache := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncoreID)
563+
for _, coreID := range freeCoresInUncoreCache.List() {
564+
freeCPUsInUncoreCache = freeCPUsInUncoreCache.Union(a.topo.CPUDetails.CPUsInCores(coreID))
565+
}
566+
klog.V(4).InfoS("freeCPUsInUncorecache : ", "freeCPUsInUncorecache", freeCPUsInUncoreCache.String(), "freeCPUsInUnCoreCache", freeCPUsInUncoreCache.String())
567+
if a.numCPUsNeeded == freeCPUsInUncoreCache.Size() {
568+
a.take(freeCPUsInUncoreCache)
569+
}
570+
}
571+
588572
// First try to take full UncoreCache, if available and need is at least the size of the UncoreCache group.
589573
// Second try to take the partial UncoreCache if available and the request size can fit w/in the UncoreCache.
590574
func (a *cpuAccumulator) takeUncoreCache() {
591-
for _, uncore := range a.allUncoreCache() {
592-
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()
593-
594-
// take full UncoreCache if the CPUs needed is greater a UncoreCache size
595-
if a.numCPUsNeeded >= a.topo.NumCPUs/a.topo.NumUncoreCache {
596-
a.takeFullUnCore()
575+
cpusPerUncoreCache := a.topo.NumCPUs / a.topo.NumUncoreCache
576+
for _, uncore := range a.sortAvailableUncoreCaches() {
577+
// take full UncoreCache if the CPUs needed is greater than free UncoreCache size
578+
if a.needsAtLeast(cpusPerUncoreCache) {
579+
a.takeFullUncore()
597580
}
598581

599-
var freeCPUsInUncoreCache cpuset.CPUSet
600-
// need to get needed cores in UncoreCache
601-
freeCoresInUncoreCache := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncore)
602-
klog.V(2).InfoS("free cores from a.details list: ", "freeCoresInUncorecache", freeCoresInUncoreCache)
603-
for _, coreID := range freeCoresInUncoreCache.List() {
604-
freeCPUsInUncoreCache = freeCPUsInUncoreCache.Union(a.topo.CPUDetails.CPUsInCores(coreID))
605-
}
606-
klog.V(2).InfoS("freeCPUsInUncorecache : ", "freeCPUsInUncorecache", freeCPUsInUncoreCache)
607-
if a.numCPUsNeeded == freeCPUsInUncoreCache.Size() {
608-
klog.V(4).InfoS("takePartialUncore: claiming cores from Uncorecache ID", "uncore", uncore)
609-
a.take(freeCPUsInUncoreCache)
582+
if a.isSatisfied() {
583+
return
610584
}
611585

586+
a.takePartialUncore(uncore)
612587
if a.isSatisfied() {
613588
return
614589
}
@@ -733,6 +708,14 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
733708
// or the remaining number of CPUs to take after having taken full sockets and NUMA nodes is less
734709
// than a whole NUMA node, the function tries to take whole physical cores (cores).
735710
//
711+
// If `PreferAlignByUncoreCache` is enabled, the function will try to optimally assign Uncorecaches.
712+
// If `numCPUs` is larger than or equal to the total number of CPUs in a Uncorecache, and there are
713+
// free (i.e. all CPUs within the Uncorecache are free) Uncorecaches, the function takes as many entire
714+
// cores from free Uncorecaches as possible. If/Once `numCPUs` is smaller than the total number of
715+
// CPUs in a free Uncorecache, the function scans each Uncorecache index in numerical order to assign
716+
// cores that will fit within the Uncorecache. If `numCPUs` cannot fit within any Uncorecache, the
717+
// function tries to take whole physical cores.
718+
//
736719
// If `numCPUs` is bigger than the total number of CPUs in a core, and there are
737720
// free (i.e. all CPUs in them are free) cores, the function takes as many entire free cores as possible.
738721
// The cores are taken from one socket at a time, and the sockets are considered by
@@ -754,7 +737,7 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
754737
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
755738
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
756739
// the least amount of free CPUs to the one with the highest amount of free CPUs.
757-
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
740+
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
758741
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
759742
if acc.isSatisfied() {
760743
return acc.result, nil
@@ -777,59 +760,16 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
777760
return acc.result, nil
778761
}
779762

780-
// 2. Acquire whole cores, if available and the container requires at least
781-
// a core's-worth of CPUs.
782-
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
783-
if cpuSortingStrategy != CPUSortingStrategySpread {
784-
acc.takeFullCores()
763+
// 2. If PreferAlignByUncoreCache is enabled, acquire whole UncoreCaches
764+
// if available and the container requires at least a UncoreCache's-worth
765+
// of CPUs. Otherwise, acquire CPUs from the least amount of UncoreCaches.
766+
if preferAlignByUncoreCache {
767+
acc.takeUncoreCache()
785768
if acc.isSatisfied() {
786769
return acc.result, nil
787770
}
788771
}
789772

790-
// 3. Acquire single threads, preferring to fill partially-allocated cores
791-
// on the same sockets as the whole cores we have already taken in this
792-
// allocation.
793-
acc.takeRemainingCPUs()
794-
if acc.isSatisfied() {
795-
return acc.result, nil
796-
}
797-
798-
return cpuset.New(), fmt.Errorf("failed to allocate cpus")
799-
}
800-
801-
// takeByTopologyUnCoreCachePacked uses the "packed" sorting strategy similar to takeByTopologyNUMAPacked.
802-
// It includes an additional level of sorting by uncorecache
803-
func takeByTopologyUncoreCachePacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
804-
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
805-
if acc.isSatisfied() {
806-
return acc.result, nil
807-
}
808-
if acc.isFailed() {
809-
return cpuset.New(), fmt.Errorf("not enough cpus available to satisfy request: requested=%d, available=%d", numCPUs, availableCPUs.Size())
810-
}
811-
812-
// Algorithm: topology-aware best-fit
813-
// 1. Acquire whole NUMA nodes and sockets, if available and the container
814-
// requires at least a NUMA node or socket's-worth of CPUs. If NUMA
815-
// Nodes map to 1 or more sockets, pull from NUMA nodes first.
816-
// Otherwise pull from sockets first.
817-
acc.numaOrSocketsFirst.takeFullFirstLevel()
818-
if acc.isSatisfied() {
819-
return acc.result, nil
820-
}
821-
acc.numaOrSocketsFirst.takeFullSecondLevel()
822-
if acc.isSatisfied() {
823-
return acc.result, nil
824-
}
825-
826-
// 2. Acquire partial uncorecache, if there are enough CPUs available to satisfy the container requirement
827-
// Acquire the full uncorecache, if available and the container requires at least all the CPUs in the uncorecache grouping
828-
acc.numaOrSocketsFirst.takeThirdLevel()
829-
if acc.isSatisfied() {
830-
return acc.result, nil
831-
}
832-
833773
// 3. Acquire whole cores, if available and the container requires at least
834774
// a core's-worth of CPUs.
835775
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
@@ -918,8 +858,10 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
918858
// If the number of CPUs requested cannot be handed out in chunks of
919859
// 'cpuGroupSize', then we just call out the packing algorithm since we
920860
// can't distribute CPUs in this chunk size.
861+
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
862+
// Support for PreferAlignByUncoreCache to be done at beta release.
921863
if (numCPUs % cpuGroupSize) != 0 {
922-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
864+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
923865
}
924866

925867
// Otherwise build an accumulator to start allocating CPUs from.
@@ -1102,7 +1044,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
11021044
// size 'cpuGroupSize' from 'bestCombo'.
11031045
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
11041046
for _, numa := range bestCombo {
1105-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy)
1047+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy, false)
11061048
acc.take(cpus)
11071049
}
11081050

@@ -1117,7 +1059,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
11171059
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
11181060
continue
11191061
}
1120-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy)
1062+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy, false)
11211063
acc.take(cpus)
11221064
remainder -= cpuGroupSize
11231065
}
@@ -1141,5 +1083,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
11411083

11421084
// If we never found a combination of NUMA nodes that we could properly
11431085
// distribute CPUs across, fall back to the packing algorithm.
1144-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
1086+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
11451087
}

pkg/kubelet/cm/cpumanager/cpu_assignment_test.go

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -668,36 +668,16 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
668668
"",
669669
mustParseCPUSet(t, "0-29,40-69,30,31,70,71"),
670670
},
671-
}...)
672-
673-
for _, tc := range testCases {
674-
t.Run(tc.description, func(t *testing.T) {
675-
strategy := CPUSortingStrategyPacked
676-
if tc.opts.DistributeCPUsAcrossCores {
677-
strategy = CPUSortingStrategySpread
678-
}
679-
680-
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
681-
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
682-
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
683-
}
684-
if !result.Equals(tc.expResult) {
685-
t.Errorf("expected result [%s] to equal [%s]", result, tc.expResult)
686-
}
687-
})
688-
}
689-
}
690-
691-
func TestTakeByTopologyUncoreCachePacked(t *testing.T) {
692-
testCases := []struct {
693-
description string
694-
topo *topology.CPUTopology
695-
opts StaticPolicyOptions
696-
availableCPUs cpuset.CPUSet
697-
numCPUs int
698-
expErr string
699-
expResult cpuset.CPUSet
700-
}{
671+
// Test cases for PreferAlignByUncoreCache
672+
{
673+
"take cpus from two full UncoreCaches and partial from a single UncoreCache",
674+
topoUncoreSingleSocketNoSMT,
675+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
676+
mustParseCPUSet(t, "1-15"),
677+
10,
678+
"",
679+
cpuset.New(1, 2, 4, 5, 6, 7, 8, 9, 10, 11),
680+
},
701681
{
702682
"take one cpu from dual socket with HT - core from Socket 0",
703683
topoDualSocketHT,
@@ -761,7 +741,7 @@ func TestTakeByTopologyUncoreCachePacked(t *testing.T) {
761741
"",
762742
mustParseCPUSet(t, "4-7,12-15,1,9"),
763743
},
764-
}
744+
}...)
765745

766746
for _, tc := range testCases {
767747
t.Run(tc.description, func(t *testing.T) {
@@ -770,7 +750,7 @@ func TestTakeByTopologyUncoreCachePacked(t *testing.T) {
770750
strategy = CPUSortingStrategySpread
771751
}
772752

773-
result, err := takeByTopologyUncoreCachePacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
753+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
774754
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
775755
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
776756
}
@@ -871,7 +851,7 @@ func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) {
871851
if tc.opts.DistributeCPUsAcrossCores {
872852
strategy = CPUSortingStrategySpread
873853
}
874-
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
854+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
875855
if tc.expErr != "" && err.Error() != tc.expErr {
876856
t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err)
877857
}

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -509,12 +509,7 @@ func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int)
509509
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize, cpuSortingStrategy)
510510
}
511511

512-
if p.options.PreferAlignByUncoreCacheOption {
513-
514-
return takeByTopologyUncoreCachePacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy)
515-
516-
}
517-
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy)
512+
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy, p.options.PreferAlignByUncoreCacheOption)
518513
}
519514

520515
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {

0 commit comments

Comments
 (0)