Skip to content

Commit 9eee423

Browse files
wilfred-scraigcondit
authored andcommitted
[YUNIKORN-3036] fix race prevention regression (#1014)
Re-instate the change from YUNIKORN-1993 that was reversed as part of YUNIKORN-2658. Add more explicit comment to not move the code. Minor cleanup in function naming and fix the function length of removeAllocation (lint fix) Closes: #1014 Signed-off-by: Craig Condit <[email protected]>
1 parent 621c184 commit 9eee423

File tree

3 files changed

+48
-81
lines changed

3 files changed

+48
-81
lines changed

pkg/scheduler/partition.go

+30-24
Original file line numberDiff line numberDiff line change
@@ -1416,7 +1416,9 @@ func (pc *PartitionContext) calculateNodesResourceUsage() map[string][]int {
14161416
return mapResult
14171417
}
14181418

1419-
func (pc *PartitionContext) generateReleased(release *si.AllocationRelease, app *objects.Application) []*objects.Allocation {
1419+
// processAllocationRelease processes the releases from the RM and removes the allocation(s) as requested.
1420+
// Updates the application which can trigger an application state change.
1421+
func (pc *PartitionContext) processAllocationRelease(release *si.AllocationRelease, app *objects.Application) []*objects.Allocation {
14201422
released := make([]*objects.Allocation, 0)
14211423
// when allocationKey is not specified, remove all allocations from the app
14221424
allocationKey := release.GetAllocationKey()
@@ -1448,7 +1450,7 @@ func (pc *PartitionContext) generateReleased(release *si.AllocationRelease, app
14481450

14491451
// removeAllocation removes the referenced allocation(s) from the applications and nodes
14501452
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
1451-
func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) { //nolint:funlen
1453+
func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) {
14521454
if release == nil {
14531455
return nil, nil
14541456
}
@@ -1468,25 +1470,20 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
14681470
return nil, nil
14691471
}
14701472

1471-
// temp store for allocations manipulated
1472-
released := pc.generateReleased(release, app)
1473-
var confirmed *objects.Allocation
1473+
// **** DO NOT MOVE **** this must be called before any allocations are released.
1474+
// Processing a removal while in the Completing state could race with the state change. The race occurs between
1475+
// removing the allocation and updating the queue after node processing. If the state change removes the queue link
1476+
// before we get to updating the queue after the node we leave the resources as allocated on the queue. The queue
1477+
// will always exist at this point. Retrieving the queue now sidesteps this.
1478+
queue := app.GetQueue()
14741479

1475-
// all releases are collected: placeholder count needs updating for all placeholder releases
1476-
// regardless of what happens later
1477-
phReleases := 0
1478-
for _, r := range released {
1479-
if r.IsPlaceholder() {
1480-
phReleases++
1481-
}
1482-
}
1483-
if phReleases > 0 {
1484-
pc.decPhAllocationCount(phReleases)
1485-
}
1480+
released := pc.processAllocationRelease(release, app)
1481+
pc.updatePhAllocationCount(released)
14861482

1487-
// for each allocation to release, update the node and queue.
14881483
total := resources.NewResource()
14891484
totalPreempting := resources.NewResource()
1485+
var confirmed *objects.Allocation
1486+
// for each allocation to release, update the node and queue.
14901487
for _, alloc := range released {
14911488
node := pc.GetNode(alloc.GetNodeID())
14921489
if node == nil {
@@ -1537,13 +1534,6 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
15371534
}
15381535
}
15391536

1540-
// Processing a removal while in the Completing state could race with the state change.
1541-
// The race occurs between removing the allocation and updating the queue after node processing.
1542-
// If the state change removes the queue link before we get to updating the queue after the node we
1543-
// leave the resources as allocated on the queue. The queue cannot be removed yet at this point as
1544-
// there are still allocations left. So retrieve the queue early to sidestep the race.
1545-
queue := app.GetQueue()
1546-
15471537
if resources.StrictlyGreaterThanZero(total) {
15481538
if err := queue.DecAllocatedResource(total); err != nil {
15491539
log.Log(log.SchedPartition).Warn("failed to release resources from queue",
@@ -1581,6 +1571,22 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
15811571
return released, confirmed
15821572
}
15831573

1574+
// updatePhAllocationCount checks the released allocations and updates the partition context counter of allocated
1575+
// placeholders.
1576+
func (pc *PartitionContext) updatePhAllocationCount(released []*objects.Allocation) {
1577+
// all releases are collected: placeholder count needs updating for all placeholder releases
1578+
// regardless of what happens later
1579+
phReleases := 0
1580+
for _, a := range released {
1581+
if a.IsPlaceholder() {
1582+
phReleases++
1583+
}
1584+
}
1585+
if phReleases > 0 {
1586+
pc.decPhAllocationCount(phReleases)
1587+
}
1588+
}
1589+
15841590
func (pc *PartitionContext) removeForeignAllocation(allocID string) {
15851591
pc.Lock()
15861592
defer pc.Unlock()

pkg/scheduler/partition_test.go

+17-54
Original file line numberDiff line numberDiff line change
@@ -1541,9 +1541,7 @@ func TestGetQueue(t *testing.T) {
15411541
func TestTryAllocate(t *testing.T) {
15421542
setupUGM()
15431543
partition := createQueuesNodes(t)
1544-
if partition == nil {
1545-
t.Fatal("partition create failed")
1546-
}
1544+
assert.Assert(t, partition != nil, "partition create failed")
15471545
if result := partition.tryAllocate(); result != nil {
15481546
t.Fatalf("empty cluster allocate returned allocation: %s", result)
15491547
}
@@ -1620,12 +1618,7 @@ func TestTryAllocate(t *testing.T) {
16201618
func TestRequiredNodeReservation(t *testing.T) {
16211619
setupUGM()
16221620
partition := createQueuesNodes(t)
1623-
if partition == nil {
1624-
t.Fatal("partition create failed")
1625-
}
1626-
if result := partition.tryAllocate(); result != nil {
1627-
t.Fatalf("empty cluster allocate returned allocation: %s", result)
1628-
}
1621+
assert.Assert(t, partition != nil, "partition create failed")
16291622
node := partition.nodes.GetNode(nodeID1)
16301623
if node == nil {
16311624
t.Fatal("node-1 should have been created")
@@ -1706,9 +1699,7 @@ func TestRequiredNodeReservation(t *testing.T) {
17061699
// allocate ask request with required node having non daemon set reservations
17071700
func TestRequiredNodeCancelOtherReservations(t *testing.T) {
17081701
partition := createQueuesNodes(t)
1709-
if partition == nil {
1710-
t.Fatal("partition create failed")
1711-
}
1702+
assert.Assert(t, partition != nil, "partition create failed")
17121703
if result := partition.tryAllocate(); result != nil {
17131704
t.Fatalf("empty cluster allocate returned allocation: %s", result)
17141705
}
@@ -1786,9 +1777,7 @@ func TestRequiredNodeCancelOtherReservations(t *testing.T) {
17861777
// allocate ask request with required node having daemon set reservations
17871778
func TestRequiredNodeCancelDSReservations(t *testing.T) {
17881779
partition := createQueuesNodes(t)
1789-
if partition == nil {
1790-
t.Fatal("partition create failed")
1791-
}
1780+
assert.Assert(t, partition != nil, "partition create failed")
17921781
if result := partition.tryAllocate(); result != nil {
17931782
t.Fatalf("empty cluster allocate returned allocation: %s", result)
17941783
}
@@ -1871,9 +1860,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) {
18711860
func TestRequiredNodeNotExist(t *testing.T) {
18721861
setupUGM()
18731862
partition := createQueuesNodes(t)
1874-
if partition == nil {
1875-
t.Fatal("partition create failed")
1876-
}
1863+
assert.Assert(t, partition != nil, "partition create failed")
18771864
if result := partition.tryAllocate(); result != nil {
18781865
t.Fatalf("empty cluster allocate returned allocation: %s", result)
18791866
}
@@ -1908,9 +1895,7 @@ func TestRequiredNodeNotExist(t *testing.T) {
19081895
// basic ds scheduling on specific node in first allocate run itself (without any need for reservation)
19091896
func TestRequiredNodeAllocation(t *testing.T) {
19101897
partition := createQueuesNodes(t)
1911-
if partition == nil {
1912-
t.Fatal("partition create failed")
1913-
}
1898+
assert.Assert(t, partition != nil, "partition create failed")
19141899
if result := partition.tryAllocate(); result != nil {
19151900
t.Fatalf("empty cluster allocate returned allocation: %s", result.Request.String())
19161901
}
@@ -2076,9 +2061,7 @@ func TestPreemptionForRequiredNodeReservedAlloc(t *testing.T) {
20762061

20772062
func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) {
20782063
partition := createQueuesNodes(t)
2079-
if partition == nil {
2080-
t.Fatal("partition create failed")
2081-
}
2064+
assert.Assert(t, partition != nil, "partition create failed")
20822065

20832066
app, testHandler := newApplicationWithHandler(appID1, "default", "root.parent.sub-leaf")
20842067
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"})
@@ -2158,9 +2141,7 @@ func getExpectedQueuesLimitsForPreemptionWithRequiredNode() map[string]map[strin
21582141
// setup the partition with existing allocations so we can test preemption
21592142
func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, *objects.Application, *objects.Allocation, *objects.Allocation) {
21602143
partition := createPreemptionQueuesNodes(t)
2161-
if partition == nil {
2162-
t.Fatal("partition create failed")
2163-
}
2144+
assert.Assert(t, partition != nil, "partition create failed")
21642145
if result := partition.tryAllocate(); result != nil {
21652146
t.Fatalf("empty cluster allocate returned allocation: %s", result)
21662147
}
@@ -2220,9 +2201,7 @@ func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, *ob
22202201
// setup the partition in a state that we need for multiple tests
22212202
func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.Application) {
22222203
partition := createQueuesNodes(t)
2223-
if partition == nil {
2224-
t.Fatal("partition create failed")
2225-
}
2204+
assert.Assert(t, partition != nil, "partition create failed")
22262205
if result := partition.tryAllocate(); result != nil {
22272206
t.Fatalf("empty cluster allocate returned allocation: %s", result)
22282207
}
@@ -2300,9 +2279,7 @@ func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.A
23002279
func TestTryAllocateLarge(t *testing.T) {
23012280
setupUGM()
23022281
partition := createQueuesNodes(t)
2303-
if partition == nil {
2304-
t.Fatal("partition create failed")
2305-
}
2282+
assert.Assert(t, partition != nil, "partition create failed")
23062283
if result := partition.tryAllocate(); result != nil {
23072284
t.Fatalf("empty cluster allocate returned allocation: %s", result)
23082285
}
@@ -2333,9 +2310,7 @@ func TestTryAllocateLarge(t *testing.T) {
23332310
func TestAllocReserveNewNode(t *testing.T) {
23342311
setupUGM()
23352312
partition := createQueuesNodes(t)
2336-
if partition == nil {
2337-
t.Fatal("partition create failed")
2338-
}
2313+
assert.Assert(t, partition != nil, "partition create failed")
23392314
if result := partition.tryAllocate(); result != nil {
23402315
t.Fatalf("empty cluster allocate returned result: %s", result)
23412316
}
@@ -2404,9 +2379,7 @@ func TestAllocReserveNewNode(t *testing.T) {
24042379
func TestTryAllocateReserve(t *testing.T) {
24052380
setupUGM()
24062381
partition := createQueuesNodes(t)
2407-
if partition == nil {
2408-
t.Fatal("partition create failed")
2409-
}
2382+
assert.Assert(t, partition != nil, "partition create failed")
24102383
if result := partition.tryReservedAllocate(); result != nil {
24112384
t.Fatalf("empty cluster reserved allocate returned allocation: %s", result)
24122385
}
@@ -2478,9 +2451,7 @@ func TestTryAllocateReserve(t *testing.T) {
24782451
func TestTryAllocateWithReserved(t *testing.T) {
24792452
setupUGM()
24802453
partition := createQueuesNodes(t)
2481-
if partition == nil {
2482-
t.Fatal("partition create failed")
2483-
}
2454+
assert.Assert(t, partition != nil, "partition create failed")
24842455
if alloc := partition.tryReservedAllocate(); alloc != nil {
24852456
t.Fatalf("empty cluster reserved allocate returned allocation: %v", alloc)
24862457
}
@@ -2502,9 +2473,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
25022473

25032474
// reserve one node: scheduling should happen on the other
25042475
node2 := partition.GetNode(nodeID2)
2505-
if node2 == nil {
2506-
t.Fatal("expected node-2 to be returned got nil")
2507-
}
2476+
assert.Assert(t, node2 != nil, "expected node-2 to be returned got nil")
25082477
partition.reserve(app, node2, ask)
25092478
if app.NodeReservedForAsk(allocKey) != nodeID2 {
25102479
t.Fatal("reservation failure for alloc-1 and node-2")
@@ -2533,9 +2502,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
25332502
func TestScheduleRemoveReservedAsk(t *testing.T) {
25342503
setupUGM()
25352504
partition := createQueuesNodes(t)
2536-
if partition == nil {
2537-
t.Fatal("partition create failed")
2538-
}
2505+
assert.Assert(t, partition != nil, "partition create failed")
25392506
if result := partition.tryAllocate(); result != nil {
25402507
t.Fatalf("empty cluster allocate returned allocation: %s", result)
25412508
}
@@ -2623,9 +2590,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
26232590
// update the config with nodes registered and make sure that the root max and guaranteed are not changed
26242591
func TestUpdateRootQueue(t *testing.T) {
26252592
partition := createQueuesNodes(t)
2626-
if partition == nil {
2627-
t.Fatal("partition create failed")
2628-
}
2593+
assert.Assert(t, partition != nil, "partition create failed")
26292594
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "20"})
26302595
assert.NilError(t, err, "resource creation failed")
26312596
assert.Assert(t, resources.Equals(res, partition.totalPartitionResource), "partition resource not set as expected")
@@ -3927,9 +3892,7 @@ func TestGetNodeSortingPolicyWhenNewPartitionFromConfig(t *testing.T) {
39273892
func TestTryAllocateMaxRunning(t *testing.T) {
39283893
const resType = "vcore"
39293894
partition := createQueuesNodes(t)
3930-
if partition == nil {
3931-
t.Fatal("partition create failed")
3932-
}
3895+
assert.Assert(t, partition != nil, "partition create failed")
39333896
if result := partition.tryAllocate(); result != nil {
39343897
t.Fatalf("empty cluster allocate returned allocation: %s", result)
39353898
}

pkg/scheduler/tests/recovery_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -675,9 +675,7 @@ func TestAppRecovery(t *testing.T) {
675675
mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
676676

677677
app := serviceContext.Scheduler.GetClusterContext().GetApplication(appID1, "[rm:123]default")
678-
if app == nil {
679-
t.Fatal("application not found after recovery")
680-
}
678+
assert.Assert(t, app != nil, "application not found after recovery")
681679
assert.Equal(t, app.ApplicationID, appID1)
682680
assert.Equal(t, app.GetQueuePath(), "root.a")
683681
}

0 commit comments

Comments
 (0)