Skip to content

Commit 56dd0d5

Browse files
wongcharffromani
authored andcommitted
implement preferAlignByUncorecache within TakeByTopologyNUMAPacked and
test cases (cherry picked from commit 726e602)
1 parent aaa3948 commit 56dd0d5

File tree

6 files changed

+181
-154
lines changed

6 files changed

+181
-154
lines changed

pkg/kubelet/cm/cpumanager/cpu_assignment.go

Lines changed: 48 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,8 @@ func min(x, y int) int {
102102
type numaOrSocketsFirstFuncs interface {
103103
takeFullFirstLevel()
104104
takeFullSecondLevel()
105-
takeThirdLevel()
106105
sortAvailableNUMANodes() []int
107106
sortAvailableSockets() []int
108-
sortAvailableUncoreCaches() []int
109107
sortAvailableCores() []int
110108
}
111109

@@ -127,18 +125,12 @@ func (n *numaFirst) takeFullSecondLevel() {
127125
n.acc.takeFullSockets()
128126
}
129127

130-
// In Split UncoreCache Topology, we take from the sets of UncoreCache as the third level
131-
func (n *numaFirst) takeThirdLevel() {
132-
n.acc.takeUncoreCache()
133-
}
134-
135-
// If NUMA nodes are higher in the memory hierarchy than sockets, then just
136-
// sort the NUMA nodes directly, and return them.
137-
func (n *numaFirst) sortAvailableUncoreCaches() []int {
128+
// Sort the UncoreCaches within the NUMA nodes.
129+
func (a *cpuAccumulator) sortAvailableUncoreCaches() []int {
138130
var result []int
139-
for _, socket := range n.acc.sortAvailableNUMANodes() {
140-
uncore := n.acc.details.UncoreInNUMANodes(socket).UnsortedList()
141-
n.acc.sort(uncore, n.acc.details.CPUsInUncoreCaches)
131+
for _, numa := range a.sortAvailableNUMANodes() {
132+
uncore := a.details.UncoreInNUMANodes(numa).UnsortedList()
133+
a.sort(uncore, a.details.CPUsInUncoreCaches)
142134
result = append(result, uncore...)
143135
}
144136
return result
@@ -189,10 +181,6 @@ func (s *socketsFirst) takeFullSecondLevel() {
189181
s.acc.takeFullNUMANodes()
190182
}
191183

192-
func (s *socketsFirst) takeThirdLevel() {
193-
s.acc.takeUncoreCache()
194-
}
195-
196184
// If sockets are higher in the memory hierarchy than NUMA nodes, then we need
197185
// to pull the set of NUMA nodes out of each sorted Socket, and accumulate the
198186
// partial order across them.
@@ -214,18 +202,6 @@ func (s *socketsFirst) sortAvailableSockets() []int {
214202
return sockets
215203
}
216204

217-
// If sockets higher in the memory hierarchy than NUMA nodes, then UncoreCache
218-
// sit directly below NUMA Nodes in the memory hierchy
219-
func (s *socketsFirst) sortAvailableUncoreCaches() []int {
220-
var result []int
221-
for _, uncore := range s.acc.sortAvailableNUMANodes() {
222-
uncore := s.acc.details.UncoreInNUMANodes(uncore).UnsortedList()
223-
s.acc.sort(uncore, s.acc.details.CPUsInUncoreCaches)
224-
result = append(result, uncore...)
225-
}
226-
return result
227-
}
228-
229205
// If sockets are higher in the memory hierarchy than NUMA nodes, then cores
230206
// sit directly below NUMA Nodes in the memory hierarchy.
231207
func (s *socketsFirst) sortAvailableCores() []int {
@@ -273,8 +249,8 @@ func (a *cpuAccumulator) isSocketFree(socketID int) bool {
273249
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
274250
}
275251

276-
// Returns true if the supplied UnCoreCache is fully available in `a.details`.
277-
// "fully available" means that all the CPUs in it are free.
252+
// Returns true if the supplied UnCoreCache is fully available,
253+
// meaning all its CPUs are free in `a.details`.
278254
func (a *cpuAccumulator) isUncoreCacheFree(uncoreID int) bool {
279255
return a.details.CPUsInUncoreCaches(uncoreID).Size() == a.topo.CPUDetails.CPUsInUncoreCaches(uncoreID).Size()
280256
}
@@ -309,19 +285,14 @@ func (a *cpuAccumulator) freeSockets() []int {
309285
// Returns free UncoreCache IDs as a slice sorted by sortAvailableUnCoreCache().
310286
func (a *cpuAccumulator) freeUncoreCache() []int {
311287
free := []int{}
312-
for _, uncore := range a.numaOrSocketsFirst.sortAvailableUncoreCaches() {
288+
for _, uncore := range a.sortAvailableUncoreCaches() {
313289
if a.isUncoreCacheFree(uncore) {
314290
free = append(free, uncore)
315291
}
316292
}
317293
return free
318294
}
319295

320-
// Returns all UncoreCache IDs as a slice sorted by sortAvailableUncoreCache().
321-
func (a *cpuAccumulator) allUncoreCache() []int {
322-
return a.numaOrSocketsFirst.sortAvailableUncoreCaches()
323-
}
324-
325296
// Returns free core IDs as a slice sorted by sortAvailableCores().
326297
func (a *cpuAccumulator) freeCores() []int {
327298
free := []int{}
@@ -414,7 +385,7 @@ func (a *cpuAccumulator) takeFullSockets() {
414385
a.take(cpusInSocket)
415386
}
416387
}
417-
func (a *cpuAccumulator) takeFullUnCore() {
388+
func (a *cpuAccumulator) takeFullUncore() {
418389
for _, uncore := range a.freeUncoreCache() {
419390
cpusInUncore := a.topo.CPUDetails.CPUsInUncoreCaches(uncore)
420391
if !a.needsAtLeast(cpusInUncore.Size()) {
@@ -424,30 +395,34 @@ func (a *cpuAccumulator) takeFullUnCore() {
424395
}
425396
}
426397

398+
func (a *cpuAccumulator) takePartialUncore(uncoreID int) {
399+
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()
400+
var freeCPUsInUncoreCache cpuset.CPUSet
401+
freeCoresInUncoreCache := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncoreID)
402+
for _, coreID := range freeCoresInUncoreCache.List() {
403+
freeCPUsInUncoreCache = freeCPUsInUncoreCache.Union(a.topo.CPUDetails.CPUsInCores(coreID))
404+
}
405+
klog.V(4).InfoS("freeCPUsInUncorecache : ", "freeCPUsInUncorecache", freeCPUsInUncoreCache.String(), "freeCPUsInUnCoreCache", freeCPUsInUncoreCache.String())
406+
if a.numCPUsNeeded == freeCPUsInUncoreCache.Size() {
407+
a.take(freeCPUsInUncoreCache)
408+
}
409+
}
410+
427411
// First try to take full UncoreCache, if available and need is at least the size of the UncoreCache group.
428412
// Second try to take the partial UncoreCache if available and the request size can fit w/in the UncoreCache.
429413
func (a *cpuAccumulator) takeUncoreCache() {
430-
for _, uncore := range a.allUncoreCache() {
431-
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()
432-
433-
// take full UncoreCache if the CPUs needed is greater a UncoreCache size
434-
if a.numCPUsNeeded >= a.topo.NumCPUs/a.topo.NumUncoreCache {
435-
a.takeFullUnCore()
414+
cpusPerUncoreCache := a.topo.NumCPUs / a.topo.NumUncoreCache
415+
for _, uncore := range a.sortAvailableUncoreCaches() {
416+
// take full UncoreCache if the CPUs needed is greater than free UncoreCache size
417+
if a.needsAtLeast(cpusPerUncoreCache) {
418+
a.takeFullUncore()
436419
}
437420

438-
var freeCPUsInUncoreCache cpuset.CPUSet
439-
// need to get needed cores in UncoreCache
440-
freeCoresInUncoreCache := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncore)
441-
klog.V(2).InfoS("free cores from a.details list: ", "freeCoresInUncorecache", freeCoresInUncoreCache)
442-
for _, coreID := range freeCoresInUncoreCache.List() {
443-
freeCPUsInUncoreCache = freeCPUsInUncoreCache.Union(a.topo.CPUDetails.CPUsInCores(coreID))
444-
}
445-
klog.V(2).InfoS("freeCPUsInUncorecache : ", "freeCPUsInUncorecache", freeCPUsInUncoreCache)
446-
if a.numCPUsNeeded == freeCPUsInUncoreCache.Size() {
447-
klog.V(4).InfoS("takePartialUncore: claiming cores from Uncorecache ID", "uncore", uncore)
448-
a.take(freeCPUsInUncoreCache)
421+
if a.isSatisfied() {
422+
return
449423
}
450424

425+
a.takePartialUncore(uncore)
451426
if a.isSatisfied() {
452427
return
453428
}
@@ -543,7 +518,7 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
543518
helper(n, k, 0, []int{}, f)
544519
}
545520

546-
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
521+
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
547522
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
548523
if acc.isSatisfied() {
549524
return acc.result, nil
@@ -566,66 +541,23 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
566541
return acc.result, nil
567542
}
568543

569-
// 2. Acquire whole cores, if available and the container requires at least
570-
// a core's-worth of CPUs.
571-
acc.takeFullCores()
572-
if acc.isSatisfied() {
573-
return acc.result, nil
574-
}
575-
576-
// 3. Acquire single threads, preferring to fill partially-allocated cores
577-
// on the same sockets as the whole cores we have already taken in this
578-
// allocation.
579-
acc.takeRemainingCPUs()
580-
if acc.isSatisfied() {
581-
return acc.result, nil
582-
}
583-
584-
return cpuset.New(), fmt.Errorf("failed to allocate cpus")
585-
}
586-
587-
// takeByTopologyUnCoreCachePacked uses the "packed" sorting strategy similar to takeByTopologyNUMAPacked.
588-
// It includes an additional level of sorting by uncorecache
589-
func takeByTopologyUncoreCachePacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
590-
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
591-
if acc.isSatisfied() {
592-
return acc.result, nil
593-
}
594-
if acc.isFailed() {
595-
return cpuset.New(), fmt.Errorf("not enough cpus available to satisfy request: requested=%d, available=%d", numCPUs, availableCPUs.Size())
596-
}
597-
598-
// Algorithm: topology-aware best-fit
599-
// 1. Acquire whole NUMA nodes and sockets, if available and the container
600-
// requires at least a NUMA node or socket's-worth of CPUs. If NUMA
601-
// Nodes map to 1 or more sockets, pull from NUMA nodes first.
602-
// Otherwise pull from sockets first.
603-
acc.numaOrSocketsFirst.takeFullFirstLevel()
604-
if acc.isSatisfied() {
605-
return acc.result, nil
606-
}
607-
acc.numaOrSocketsFirst.takeFullSecondLevel()
608-
if acc.isSatisfied() {
609-
return acc.result, nil
610-
}
611-
612-
// 2. Acquire partial uncorecache, if there are enough CPUs available to satisfy the container requirement
613-
// Acquire the full uncorecache, if available and the container requires at least all the CPUs in the uncorecache grouping
614-
acc.numaOrSocketsFirst.takeThirdLevel()
615-
if acc.isSatisfied() {
616-
return acc.result, nil
544+
// 2. If PreferAlignByUncoreCache is enabled, acquire whole UncoreCaches
545+
// if available and the container requires at least a UncoreCache's-worth
546+
// of CPUs. Otherwise, acquire CPUs from the least amount of UncoreCaches.
547+
if preferAlignByUncoreCache {
548+
acc.takeUncoreCache()
549+
if acc.isSatisfied() {
550+
return acc.result, nil
551+
}
617552
}
618553

619554
// 3. Acquire whole cores, if available and the container requires at least
620555
// a core's-worth of CPUs.
621556
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
622-
if cpuSortingStrategy != CPUSortingStrategySpread {
623-
acc.takeFullCores()
624-
if acc.isSatisfied() {
625-
return acc.result, nil
626-
}
557+
acc.takeFullCores()
558+
if acc.isSatisfied() {
559+
return acc.result, nil
627560
}
628-
629561
// 4. Acquire single threads, preferring to fill partially-allocated cores
630562
// on the same sockets as the whole cores we have already taken in this
631563
// allocation.
@@ -704,8 +636,10 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
704636
// If the number of CPUs requested cannot be handed out in chunks of
705637
// 'cpuGroupSize', then we just call out the packing algorithm since we
706638
// can't distribute CPUs in this chunk size.
639+
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
640+
// Support for PreferAlignByUncoreCache to be done at beta release.
707641
if (numCPUs % cpuGroupSize) != 0 {
708-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
642+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, false)
709643
}
710644

711645
// Otherwise build an accumulator to start allocating CPUs from.
@@ -888,7 +822,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
888822
// size 'cpuGroupSize' from 'bestCombo'.
889823
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
890824
for _, numa := range bestCombo {
891-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution)
825+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, false)
892826
acc.take(cpus)
893827
}
894828

@@ -903,7 +837,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
903837
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
904838
continue
905839
}
906-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize)
840+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, false)
907841
acc.take(cpus)
908842
remainder -= cpuGroupSize
909843
}
@@ -927,5 +861,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
927861

928862
// If we never found a combination of NUMA nodes that we could properly
929863
// distribute CPUs across, fall back to the packing algorithm.
930-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
864+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, false)
931865
}

pkg/kubelet/cm/cpumanager/cpu_assignment_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,16 @@ func TestTakeByTopologyUncoreCachePacked(t *testing.T) {
676676
expErr string
677677
expResult cpuset.CPUSet
678678
}{
679+
// Test cases for PreferAlignByUncoreCache
680+
{
681+
"take cpus from two full UncoreCaches and partial from a single UncoreCache",
682+
topoUncoreSingleSocketNoSMT,
683+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
684+
mustParseCPUSet(t, "1-15"),
685+
10,
686+
"",
687+
cpuset.New(1, 2, 4, 5, 6, 7, 8, 9, 10, 11),
688+
},
679689
{
680690
"take one cpu from dual socket with HT - core from Socket 0",
681691
topoDualSocketHT,
@@ -748,7 +758,7 @@ func TestTakeByTopologyUncoreCachePacked(t *testing.T) {
748758
strategy = CPUSortingStrategySpread
749759
}
750760

751-
result, err := takeByTopologyUncoreCachePacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
761+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
752762
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
753763
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
754764
}

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -506,12 +506,7 @@ func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int)
506506
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize)
507507
}
508508

509-
if p.options.PreferAlignByUncoreCacheOption {
510-
511-
return takeByTopologyUncoreCachePacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy)
512-
513-
}
514-
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy)
509+
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy, p.options.PreferAlignByUncoreCacheOption)
515510
}
516511

517512
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {

pkg/kubelet/cm/cpumanager/policy_test.go

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ var (
7272
5: {CoreID: 5, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
7373
6: {CoreID: 6, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
7474
7: {CoreID: 7, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
75-
8: {CoreID: 8, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 2},
76-
9: {CoreID: 9, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 2},
77-
10: {CoreID: 10, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 2},
78-
11: {CoreID: 11, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 2},
79-
12: {CoreID: 12, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 3},
80-
13: {CoreID: 13, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 3},
81-
14: {CoreID: 14, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 3},
82-
15: {CoreID: 15, SocketID: 1, NUMANodeID: 1, UncoreCacheID: 3},
75+
8: {CoreID: 8, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 2},
76+
9: {CoreID: 9, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 2},
77+
10: {CoreID: 10, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 2},
78+
11: {CoreID: 11, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 2},
79+
12: {CoreID: 12, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 3},
80+
13: {CoreID: 13, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 3},
81+
14: {CoreID: 14, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 3},
82+
15: {CoreID: 15, SocketID: 1, NUMANodeID: 0, UncoreCacheID: 3},
8383
},
8484
}
8585

@@ -133,6 +133,31 @@ var (
133133
},
134134
}
135135

136+
topoUncoreSingleSocketNoSMT = &topology.CPUTopology{
137+
NumCPUs: 16,
138+
NumSockets: 1,
139+
NumCores: 16,
140+
NumUncoreCache: 4,
141+
CPUDetails: map[int]topology.CPUInfo{
142+
0: {CoreID: 0, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
143+
1: {CoreID: 1, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
144+
2: {CoreID: 2, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
145+
3: {CoreID: 3, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 0},
146+
4: {CoreID: 4, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
147+
5: {CoreID: 5, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
148+
6: {CoreID: 6, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
149+
7: {CoreID: 7, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 1},
150+
8: {CoreID: 8, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
151+
9: {CoreID: 9, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
152+
10: {CoreID: 10, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
153+
11: {CoreID: 11, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 2},
154+
12: {CoreID: 12, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
155+
13: {CoreID: 13, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
156+
14: {CoreID: 14, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
157+
15: {CoreID: 15, SocketID: 0, NUMANodeID: 0, UncoreCacheID: 3},
158+
},
159+
}
160+
136161
topoDualSocketNoHT = &topology.CPUTopology{
137162
NumCPUs: 8,
138163
NumSockets: 2,

0 commit comments

Comments
 (0)