Skip to content

Commit a5f230c

Browse files
committed
sync: do not clear sync.Pools completely every GC
Specifically, we clear only half of the poolLocals. To do this we also have to switch from a global array of Pools to a linked list, so that we can retain Pools in use and drop Pools that are empty without allocating. This means that, for a Pool that suddenly stops being used and gets dropped: - during the first GC: half of the poolLocals are cleared - during the second GC: the second half of the poolLocals are cleared - during the third GC: the Pool itself is dropped from allPools Fixes golang#22950 Change-Id: I993666496ca529d301ab6faa94898a45615bb92a
1 parent 7cf31d8 commit a5f230c

File tree

2 files changed

+203
-8
lines changed

2 files changed

+203
-8
lines changed

src/sync/pool.go

+91-8
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (p *Pool) pinSlow() *poolLocal {
205205
return indexLocal(l, pid)
206206
}
207207
if p.local == nil {
208-
allPools = append(allPools, p)
208+
allPools.pushBack(p)
209209
}
210210
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
211211
size := runtime.GOMAXPROCS(0)
@@ -215,32 +215,56 @@ func (p *Pool) pinSlow() *poolLocal {
215215
return &local[pid]
216216
}
217217

218+
var cleanupCount uint
219+
218220
func poolCleanup() {
219221
// This function is called with the world stopped, at the beginning of a garbage collection.
220222
// It must not allocate and probably should not call any runtime functions.
221-
// Defensively zero out everything, 2 reasons:
223+
// When pools or poolLocals need to be emptied we defensively zero out everything for 2 reasons:
222224
// 1. To prevent false retention of whole Pools.
223225
// 2. If GC happens while a goroutine works with l.shared in Put/Get,
224226
// it will retain whole Pool. So next cycle memory consumption would be doubled.
225-
for i, p := range allPools {
226-
allPools[i] = nil
227+
// Under normal circumstances we don't delete all pools, instead we drop half of the poolLocals
228+
// every cycle, and whole pools if all poolLocals are empty when starting the cleanup (this means
229+
// that a non-empty Pool will take 3 GC cycles to be completely deleted: the first will delete
230+
// half of the poolLocals, the second the remaining half and the third the now empty Pool itself).
231+
232+
// deleteAll is a placeholder for dynamically controlling whether pools are aggressively
233+
// or partially cleaned up. If true, all pools are emptied every GC; if false only half of
234+
// the poolLocals are dropped. For now it is a constant so that it can be optimized away
235+
// at compile-time; ideally the runtime should decide whether to set deleteAll to true
236+
// based on memory pressure.
237+
const deleteAll = false
238+
239+
for e := allPools.front(); e != nil; e = e.nextElement() {
240+
p := e.value
241+
empty := true
227242
for i := 0; i < int(p.localSize); i++ {
228243
l := indexLocal(p.local, i)
244+
if l.private != nil || len(l.shared) > 0 {
245+
empty = false
246+
}
247+
if i%2 == int(cleanupCount%2) && !deleteAll {
248+
continue
249+
}
229250
l.private = nil
230251
for j := range l.shared {
231252
l.shared[j] = nil
232253
}
233254
l.shared = nil
234255
}
235-
p.local = nil
236-
p.localSize = 0
256+
if empty || deleteAll {
257+
p.local = nil
258+
p.localSize = 0
259+
allPools.remove(e)
260+
}
237261
}
238-
allPools = []*Pool{}
262+
cleanupCount++
239263
}
240264

241265
var (
242266
allPoolsMu Mutex
243-
allPools []*Pool
267+
allPools list
244268
)
245269

246270
func init() {
@@ -256,3 +280,62 @@ func indexLocal(l unsafe.Pointer, i int) *poolLocal {
256280
func runtime_registerPoolCleanup(cleanup func())
257281
func runtime_procPin() int
258282
func runtime_procUnpin()
283+
284+
// Stripped-down and specialized version of container/list (to avoid using interface{}
285+
// casts, since they can allocate and allocation is forbidden in poolCleanup). Note that
286+
// these functions are so small and simple that they all end up completely inlined.
287+
288+
type element struct {
289+
next, prev *element
290+
list *list
291+
value *Pool
292+
}
293+
294+
func (e *element) nextElement() *element {
295+
if p := e.next; e.list != nil && p != &e.list.root {
296+
return p
297+
}
298+
return nil
299+
}
300+
301+
type list struct {
302+
root element
303+
}
304+
305+
func (l *list) front() *element {
306+
if l.root.next == &l.root {
307+
return nil
308+
}
309+
return l.root.next
310+
}
311+
312+
func (l *list) lazyInit() {
313+
if l.root.next == nil {
314+
l.root.next = &l.root
315+
l.root.prev = &l.root
316+
}
317+
}
318+
319+
func (l *list) insert(e, at *element) {
320+
n := at.next
321+
at.next = e
322+
e.prev = at
323+
e.next = n
324+
n.prev = e
325+
e.list = l
326+
}
327+
328+
func (l *list) remove(e *element) {
329+
if e.list == l {
330+
e.prev.next = e.next
331+
e.next.prev = e.prev
332+
e.next = nil
333+
e.prev = nil
334+
e.list = nil
335+
}
336+
}
337+
338+
func (l *list) pushBack(v *Pool) {
339+
l.lazyInit()
340+
l.insert(&element{value: v}, l.root.prev)
341+
}

src/sync/pool_test.go

+112
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
package sync_test
99

1010
import (
11+
"bytes"
12+
"math/rand"
1113
"runtime"
1214
"runtime/debug"
15+
"strconv"
1316
. "sync"
1417
"sync/atomic"
1518
"testing"
@@ -43,6 +46,7 @@ func TestPool(t *testing.T) {
4346
p.Put("c")
4447
debug.SetGCPercent(100) // to allow following GC to actually run
4548
runtime.GC()
49+
runtime.GC() // we now keep some objects until two consecutive GCs
4650
if g := p.Get(); g != nil {
4751
t.Fatalf("got %#v; want nil after GC", g)
4852
}
@@ -120,6 +124,62 @@ loop:
120124
}
121125
}
122126

127+
func TestPoolPartialRelease(t *testing.T) {
128+
if runtime.GOMAXPROCS(-1) <= 1 {
129+
t.Skip("pool partial release test is only stable when GOMAXPROCS > 1")
130+
}
131+
132+
// disable GC so we can control when it happens.
133+
defer debug.SetGCPercent(debug.SetGCPercent(-1))
134+
135+
Ps := runtime.GOMAXPROCS(-1)
136+
Gs := Ps * 10
137+
Gobjs := 10000
138+
139+
var p Pool
140+
var wg WaitGroup
141+
start := int32(0)
142+
for i := 0; i < Gs; i++ {
143+
wg.Add(1)
144+
go func() {
145+
defer wg.Done()
146+
atomic.AddInt32(&start, 1)
147+
for atomic.LoadInt32(&start) < int32(Ps) {
148+
// spin until enough Gs are ready to go
149+
}
150+
for j := 0; j < Gobjs; j++ {
151+
p.Put(new (string))
152+
}
153+
}()
154+
}
155+
wg.Wait()
156+
157+
waitGC()
158+
159+
inpool := 0
160+
for p.Get() != nil {
161+
inpool++
162+
}
163+
objs := Gs * Gobjs
164+
min, max := objs/2 - objs/Ps, objs/2 + objs/Ps
165+
if inpool < min || inpool > max {
166+
// GC should have dropped half of the per-P shards; because we don't know the
167+
// exact distribution of the objects in the shards when we started, and we don't
168+
// know which shards have been cleared, we consider the test successful as long
169+
// as after GC the number of remaining objects is half ± objs/Ps.
170+
t.Fatalf("objects in pool %d, expected between %d and %d", inpool, min, max)
171+
}
172+
}
173+
174+
func waitGC() {
175+
ch := make(chan struct{})
176+
runtime.SetFinalizer(&[16]byte{}, func(_ interface{}) {
177+
close(ch)
178+
})
179+
runtime.GC()
180+
<-ch
181+
}
182+
123183
func TestPoolStress(t *testing.T) {
124184
const P = 10
125185
N := int(1e6)
@@ -173,3 +233,55 @@ func BenchmarkPoolOverflow(b *testing.B) {
173233
}
174234
})
175235
}
236+
237+
var bufSizes = []int{1 << 8, 1 << 12, 1 << 16, 1 << 20, 1 << 24}
238+
239+
func BenchmarkPoolBuffer(b *testing.B) {
240+
for _, sz := range bufSizes {
241+
sz := sz
242+
b.Run(strconv.Itoa(sz), func(b *testing.B) {
243+
var p Pool
244+
var i int64
245+
b.RunParallel(func(pb *testing.PB) {
246+
rnd := rand.New(rand.NewSource(atomic.AddInt64(&i, 1)))
247+
var j int
248+
for pb.Next() {
249+
buf, _ := p.Get().(*bytes.Buffer)
250+
if buf == nil {
251+
buf = &bytes.Buffer{}
252+
}
253+
buf.Grow(rnd.Intn(sz * 2))
254+
255+
go p.Put(buf)
256+
j++
257+
if j%256 == 0 {
258+
runtime.Gosched()
259+
}
260+
}
261+
})
262+
})
263+
}
264+
}
265+
266+
func BenchmarkNoPoolBuffer(b *testing.B) {
267+
for _, sz := range bufSizes {
268+
sz := sz
269+
b.Run(strconv.Itoa(sz), func(b *testing.B) {
270+
var i int64
271+
b.RunParallel(func(pb *testing.PB) {
272+
rnd := rand.New(rand.NewSource(atomic.AddInt64(&i, 1)))
273+
var j int
274+
for pb.Next() {
275+
buf := &bytes.Buffer{}
276+
buf.Grow(rnd.Intn(sz * 2))
277+
278+
go runtime.KeepAlive(buf)
279+
j++
280+
if j%256 == 0 {
281+
runtime.Gosched()
282+
}
283+
}
284+
})
285+
})
286+
}
287+
}

0 commit comments

Comments
 (0)