Skip to content

Commit 476958b

Browse files
committed
sync: do not clear sync.Pools completely every GC
Specifically, we clear only half of the poolLocals. To do this we also have to switch from a global array of Pools to a linked list, so that we can retain Pools in use and drop Pools that are empty without allocating. This means that, for a Pool that suddenly stops being used and gets dropped: - during the first GC: half of the poolLocals are cleared - during the second GC: the second half of the poolLocals are cleared - during the third GC: the Pool itself is dropped from allPools This simplified approach is chosen as this allows to not have to worry about resizing the shared arrays during clearPools and it does not add any synchronization (or atomic operations) during Put/Get. Fixes golang#22950
1 parent 7cf31d8 commit 476958b

File tree

2 files changed

+205
-8
lines changed

2 files changed

+205
-8
lines changed

src/sync/pool.go

+93-8
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func (p *Pool) getSlow() (x interface{}) {
166166
last := len(l.shared) - 1
167167
if last >= 0 {
168168
x = l.shared[last]
169+
l.shared[last] = nil
169170
l.shared = l.shared[:last]
170171
l.Unlock()
171172
break
@@ -205,7 +206,7 @@ func (p *Pool) pinSlow() *poolLocal {
205206
return indexLocal(l, pid)
206207
}
207208
if p.local == nil {
208-
allPools = append(allPools, p)
209+
allPools.pushBack(p)
209210
}
210211
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
211212
size := runtime.GOMAXPROCS(0)
@@ -215,32 +216,56 @@ func (p *Pool) pinSlow() *poolLocal {
215216
return &local[pid]
216217
}
217218

219+
var cleanupCount uint
220+
218221
func poolCleanup() {
219222
// This function is called with the world stopped, at the beginning of a garbage collection.
220223
// It must not allocate and probably should not call any runtime functions.
221-
// Defensively zero out everything, 2 reasons:
224+
// When pools or poolLocals need to be emptied we defensively zero out everything for 2 reasons:
222225
// 1. To prevent false retention of whole Pools.
223226
// 2. If GC happens while a goroutine works with l.shared in Put/Get,
224227
// it will retain whole Pool. So next cycle memory consumption would be doubled.
225-
for i, p := range allPools {
226-
allPools[i] = nil
228+
// Under normal circumstances we don't delete all pools, instead we drop half of the poolLocals
229+
// every cycle, and whole pools if all poolLocals are empty when starting the cleanup (this means
230+
// that a non-empty Pool will take 3 GC cycles to be completely deleted: the first will delete
231+
// half of the poolLocals, the second the remaining half and the third the now empty Pool itself).
232+
233+
// deleteAll is a placeholder for dynamically controlling whether pools are aggressively
234+
// or partially cleaned up. If true, all pools are emptied every GC; if false only half of
235+
// the poolLocals are dropped. For now it is a constant so that it can be optimized away
236+
// at compile-time; ideally the runtime should decide whether to set deleteAll to true
237+
// based on memory pressure (see #29696).
238+
const deleteAll = false
239+
240+
for e := allPools.front(); e != nil; e = e.nextElement() {
241+
p := e.value
242+
empty := true
227243
for i := 0; i < int(p.localSize); i++ {
228244
l := indexLocal(p.local, i)
245+
if l.private != nil || len(l.shared) > 0 {
246+
empty = false
247+
}
248+
if i%2 == int(cleanupCount%2) && !deleteAll {
249+
continue
250+
}
229251
l.private = nil
230252
for j := range l.shared {
231253
l.shared[j] = nil
232254
}
233255
l.shared = nil
234256
}
235-
p.local = nil
236-
p.localSize = 0
257+
if empty || deleteAll {
258+
p.local = nil
259+
p.localSize = 0
260+
allPools.remove(e)
261+
}
237262
}
238-
allPools = []*Pool{}
263+
cleanupCount++
239264
}
240265

241266
var (
242267
allPoolsMu Mutex
243-
allPools []*Pool
268+
allPools list
244269
)
245270

246271
func init() {
@@ -256,3 +281,63 @@ func indexLocal(l unsafe.Pointer, i int) *poolLocal {
256281
func runtime_registerPoolCleanup(cleanup func())
257282
func runtime_procPin() int
258283
func runtime_procUnpin()
284+
285+
// Stripped-down and specialized version of container/list (to avoid using interface{}
286+
// casts, since they can allocate and allocation is forbidden in poolCleanup). Note that
287+
// these functions are so small and simple that they all end up completely inlined.
288+
// pushBack may potentially be made atomic so that allPoolsMu can be removed.
289+
290+
type element struct {
291+
next, prev *element
292+
list *list
293+
value *Pool
294+
}
295+
296+
func (e *element) nextElement() *element {
297+
if p := e.next; e.list != nil && p != &e.list.root {
298+
return p
299+
}
300+
return nil
301+
}
302+
303+
type list struct {
304+
root element
305+
}
306+
307+
func (l *list) front() *element {
308+
if l.root.next == &l.root {
309+
return nil
310+
}
311+
return l.root.next
312+
}
313+
314+
func (l *list) lazyInit() {
315+
if l.root.next == nil {
316+
l.root.next = &l.root
317+
l.root.prev = &l.root
318+
}
319+
}
320+
321+
func (l *list) insert(e, at *element) {
322+
n := at.next
323+
at.next = e
324+
e.prev = at
325+
e.next = n
326+
n.prev = e
327+
e.list = l
328+
}
329+
330+
func (l *list) remove(e *element) {
331+
if e.list == l {
332+
e.prev.next = e.next
333+
e.next.prev = e.prev
334+
e.next = nil
335+
e.prev = nil
336+
e.list = nil
337+
}
338+
}
339+
340+
func (l *list) pushBack(v *Pool) {
341+
l.lazyInit()
342+
l.insert(&element{value: v}, l.root.prev)
343+
}

src/sync/pool_test.go

+112
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
package sync_test
99

1010
import (
11+
"bytes"
12+
"math/rand"
1113
"runtime"
1214
"runtime/debug"
15+
"strconv"
1316
. "sync"
1417
"sync/atomic"
1518
"testing"
@@ -43,6 +46,7 @@ func TestPool(t *testing.T) {
4346
p.Put("c")
4447
debug.SetGCPercent(100) // to allow following GC to actually run
4548
runtime.GC()
49+
runtime.GC() // we now keep some objects until two consecutive GCs
4650
if g := p.Get(); g != nil {
4751
t.Fatalf("got %#v; want nil after GC", g)
4852
}
@@ -120,6 +124,62 @@ loop:
120124
}
121125
}
122126

127+
func TestPoolPartialRelease(t *testing.T) {
128+
if runtime.GOMAXPROCS(-1) <= 1 {
129+
t.Skip("pool partial release test is only stable when GOMAXPROCS > 1")
130+
}
131+
132+
// disable GC so we can control when it happens.
133+
defer debug.SetGCPercent(debug.SetGCPercent(-1))
134+
135+
Ps := runtime.GOMAXPROCS(-1)
136+
Gs := Ps * 10
137+
Gobjs := 10000
138+
139+
var p Pool
140+
var wg WaitGroup
141+
start := int32(0)
142+
for i := 0; i < Gs; i++ {
143+
wg.Add(1)
144+
go func() {
145+
defer wg.Done()
146+
atomic.AddInt32(&start, 1)
147+
for atomic.LoadInt32(&start) < int32(Ps) {
148+
// spin until enough Gs are ready to go
149+
}
150+
for j := 0; j < Gobjs; j++ {
151+
p.Put(new (string))
152+
}
153+
}()
154+
}
155+
wg.Wait()
156+
157+
waitGC()
158+
159+
inpool := 0
160+
for p.Get() != nil {
161+
inpool++
162+
}
163+
objs := Gs * Gobjs
164+
min, max := objs/2 - objs/Ps, objs/2 + objs/Ps
165+
if inpool < min || inpool > max {
166+
// GC should have dropped half of the per-P shards; because we don't know the
167+
// exact distribution of the objects in the shards when we started, and we don't
168+
// know which shards have been cleared, we consider the test successful as long
169+
// as after GC the number of remaining objects is half ± objs/Ps.
170+
t.Fatalf("objects in pool %d, expected between %d and %d", inpool, min, max)
171+
}
172+
}
173+
174+
func waitGC() {
175+
ch := make(chan struct{})
176+
runtime.SetFinalizer(&[16]byte{}, func(_ interface{}) {
177+
close(ch)
178+
})
179+
runtime.GC()
180+
<-ch
181+
}
182+
123183
func TestPoolStress(t *testing.T) {
124184
const P = 10
125185
N := int(1e6)
@@ -173,3 +233,55 @@ func BenchmarkPoolOverflow(b *testing.B) {
173233
}
174234
})
175235
}
236+
237+
var bufSizes = []int{1 << 8, 1 << 12, 1 << 16, 1 << 20, 1 << 24}
238+
239+
func BenchmarkPoolBuffer(b *testing.B) {
240+
for _, sz := range bufSizes {
241+
sz := sz
242+
b.Run(strconv.Itoa(sz), func(b *testing.B) {
243+
var p Pool
244+
var i int64
245+
b.RunParallel(func(pb *testing.PB) {
246+
rnd := rand.New(rand.NewSource(atomic.AddInt64(&i, 1)))
247+
var j int
248+
for pb.Next() {
249+
buf, _ := p.Get().(*bytes.Buffer)
250+
if buf == nil {
251+
buf = &bytes.Buffer{}
252+
}
253+
buf.Grow(rnd.Intn(sz * 2))
254+
255+
go p.Put(buf)
256+
j++
257+
if j%256 == 0 {
258+
runtime.Gosched()
259+
}
260+
}
261+
})
262+
})
263+
}
264+
}
265+
266+
func BenchmarkNoPoolBuffer(b *testing.B) {
267+
for _, sz := range bufSizes {
268+
sz := sz
269+
b.Run(strconv.Itoa(sz), func(b *testing.B) {
270+
var i int64
271+
b.RunParallel(func(pb *testing.PB) {
272+
rnd := rand.New(rand.NewSource(atomic.AddInt64(&i, 1)))
273+
var j int
274+
for pb.Next() {
275+
buf := &bytes.Buffer{}
276+
buf.Grow(rnd.Intn(sz * 2))
277+
278+
go runtime.KeepAlive(buf)
279+
j++
280+
if j%256 == 0 {
281+
runtime.Gosched()
282+
}
283+
}
284+
})
285+
})
286+
}
287+
}

0 commit comments

Comments
 (0)