Skip to content

Commit 92c99aa

Browse files
[IMPROVED] Publish permissions cache pruning (#6674)
The test `TestLeafNodePubAllowedPruning` would sometimes report (after all go routines calling c.pubAllowed() being done) that the cache size was greater than the max allowed (128). @MauriceVanVeen did fix the test with a correct explanation of why this test flapped, but I think we can improve a tiny bit the code so that the cache is much likely to be pruned enough. In this PR, I make `prunePubPermsCache()` attempts up to 5 times to prune enough the cache, locking/releasing at each attempt to improve the odds that another routine takes a shot at it. In my experiments, I am seeing only up to 2 attempts to be below the max size (and less than 10ms execution time). That being said, with artificial sleep before the release of the lock, it could take way more attempts. So for very slow machines, although I have reverted the changes to the test done in PR#6636, I did add a little protection to call one more time `c.pubAllowed()` if we first detect that we are over the limit. If we are still after that the test will fail and would need another look. Signed-off-by: Ivan Kozlovic <[email protected]>
2 parents 44cc2ac + 79c6552 commit 92c99aa

File tree

2 files changed

+40
-31
lines changed

2 files changed

+40
-31
lines changed

server/client.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3820,24 +3820,34 @@ func (c *client) pruneDenyCache() {
38203820
// prunePubPermsCache will prune the cache via randomly
38213821
// deleting items. Doing so pruneSize items at a time.
38223822
func (c *client) prunePubPermsCache() {
3823-
// There is a case where we can invoke this from multiple go routines,
3824-
// (in deliverMsg() if sub.client is a LEAF), so we make sure to prune
3825-
// from only one go routine at a time.
3826-
if !atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) {
3827-
return
3828-
}
3829-
const maxPruneAtOnce = 1000
3830-
r := 0
3831-
c.perms.pcache.Range(func(k, _ any) bool {
3832-
c.perms.pcache.Delete(k)
3833-
if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) ||
3834-
(r > maxPruneAtOnce) {
3835-
return false
3823+
// With parallel additions to the cache, it is possible that this function
3824+
// would not be able to reduce the cache to its max size in one go. We
3825+
// will try a few times but will release/reacquire the "lock" at each
3826+
// attempt to give a chance to another go routine to take over and not
3827+
// have this go routine do too many attempts.
3828+
for i := 0; i < 5; i++ {
3829+
// There is a case where we can invoke this from multiple go routines,
3830+
// (in deliverMsg() if sub.client is a LEAF), so we make sure to prune
3831+
// from only one go routine at a time.
3832+
if !atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) {
3833+
return
38363834
}
3837-
return true
3838-
})
3839-
atomic.AddInt32(&c.perms.pcsz, -int32(r))
3840-
atomic.StoreInt32(&c.perms.prun, 0)
3835+
const maxPruneAtOnce = 1000
3836+
r := 0
3837+
c.perms.pcache.Range(func(k, _ any) bool {
3838+
c.perms.pcache.Delete(k)
3839+
if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) ||
3840+
(r > maxPruneAtOnce) {
3841+
return false
3842+
}
3843+
return true
3844+
})
3845+
n := atomic.AddInt32(&c.perms.pcsz, -int32(r))
3846+
atomic.StoreInt32(&c.perms.prun, 0)
3847+
if n <= int32(maxPermCacheSize) {
3848+
return
3849+
}
3850+
}
38413851
}
38423852

38433853
// pubAllowed checks on publish permissioning.

server/leafnode_test.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,20 +1466,19 @@ func TestLeafNodePubAllowedPruning(t *testing.T) {
14661466
}
14671467

14681468
wg.Wait()
1469-
checkFor(t, 2*time.Second, time.Millisecond, func() (rerr error) {
1470-
defer func() {
1471-
if rerr != nil {
1472-
c.pubAllowed(nats.NewInbox())
1473-
}
1474-
}()
1475-
if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize {
1476-
return fmt.Errorf("Expected size to be less than %v, got %v", maxPermCacheSize, n)
1477-
}
1478-
if n := atomic.LoadInt32(&c.perms.prun); n != 0 {
1479-
t.Fatalf("c.perms.prun should be 0, was %v", n)
1480-
}
1481-
return nil
1482-
})
1469+
// The cache prune function does try for a bit to make sure the cache
1470+
// is below the maxPermCacheSize value, but depending on the machine
1471+
// this runs on, it may be that it is still a bit over. If so, run
1472+
// pubAllowed one more time and we must get below.
1473+
if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize {
1474+
c.pubAllowed(nats.NewInbox())
1475+
}
1476+
if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize {
1477+
t.Fatalf("Expected size to be less than %v, got %v", maxPermCacheSize, n)
1478+
}
1479+
if n := atomic.LoadInt32(&c.perms.prun); n != 0 {
1480+
t.Fatalf("c.perms.prun should be 0, was %v", n)
1481+
}
14831482
}
14841483

14851484
func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) {

0 commit comments

Comments
 (0)