Skip to content

Commit 97e0e6c

Browse files
committed
schedule: add task(gr3) block/wake function
The current mechenism, sentry uses channel to communicate with go runtime to block or wake one task. Add Schedule/Wake function at go runtime, which add two interface, runtime.Schedule() and runtime.WakeUpG(). Sentry task which need block just uses runtime.Schedule() to block, the one which need wake one task just uses runtime.WakeUpG(). Sentry task and go runtime scheduler are aware of each other directly , not by middle lays such as channel. based on goruntime: GuhuangLS/go@a01b27f Signed-off-by: liushi <[email protected]>
1 parent df7c82a commit 97e0e6c

File tree

4 files changed

+115
-18
lines changed

4 files changed

+115
-18
lines changed

pkg/sentry/kernel/futex/futex.go

+49-9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package futex
1919

2020
import (
21+
"runtime"
22+
2123
"gvisor.dev/gvisor/pkg/abi/linux"
2224
"gvisor.dev/gvisor/pkg/context"
2325
"gvisor.dev/gvisor/pkg/hostarch"
@@ -233,6 +235,18 @@ type Waiter struct {
233235

234236
// tid is the thread ID for the waiter in case this is a PI mutex.
235237
tid uint32
238+
239+
// G is the go routine of the waiter.
240+
G uintptr
241+
242+
// Forever is the select of the waiter: forever or timeout.
243+
Forever bool
244+
245+
// Interrupt informs task/G waked by interrupt.
246+
Interrupt bool
247+
248+
// Woken for test.
249+
Woken bool
236250
}
237251

238252
// NewWaiter returns a new unqueued Waiter.
@@ -244,7 +258,7 @@ func NewWaiter() *Waiter {
244258

245259
// woken returns true if w has been woken since the last call to WaitPrepare.
246260
func (w *Waiter) woken() bool {
247-
return len(w.C) != 0
261+
return len(w.C) != 0 || w.Woken
248262
}
249263

250264
// bucket holds a list of waiters for a given address hash.
@@ -273,16 +287,23 @@ func (b *bucket) wakeLocked(key *Key, bitmask uint32, n int) int {
273287
// Remove from the bucket and wake the waiter.
274288
woke := w
275289
w = w.Next() // Next iteration.
276-
b.wakeWaiterLocked(woke)
290+
b.wakeWaiterLocked(woke, key.Kind)
291+
woke.Woken = true
277292
done++
278293
}
279294
return done
280295
}
281296

282-
func (b *bucket) wakeWaiterLocked(w *Waiter) {
297+
func (b *bucket) wakeWaiterLocked(w *Waiter, kind KeyKind) {
283298
// Remove from the bucket and wake the waiter.
284299
b.waiters.Remove(w)
285-
w.C <- struct{}{}
300+
301+
if kind == KindPrivate && w.Forever && w.key.Kind == KindPrivate {
302+
w.Interrupt = false
303+
runtime.WakeG(w.G)
304+
} else {
305+
w.C <- struct{}{}
306+
}
286307

287308
// NOTE: The above channel write establishes a write barrier according
288309
// to the memory model, so nothing may be ordered around it. Since
@@ -396,6 +417,13 @@ func (m *Manager) Fork() *Manager {
396417
}
397418
}
398419

420+
func (m *Manager) KeyMatch(w *Waiter) bool {
421+
if w.key.Kind == KindPrivate {
422+
return true
423+
}
424+
return false
425+
}
426+
399427
// lockBucket returns a locked bucket for the given key.
400428
func (m *Manager) lockBucket(k *Key) *bucket {
401429
var b *bucket
@@ -452,6 +480,9 @@ func (m *Manager) Wake(t Target, addr hostarch.Addr, private bool, bitmask uint3
452480
// This function is very hot; avoid defer.
453481
k, err := getKey(t, addr, private)
454482
if err != nil {
483+
if locked {
484+
b.mu.Unlock()
485+
}
455486
return 0, err
456487
}
457488

@@ -553,22 +584,30 @@ func (m *Manager) WakeOp(t Target, addr1, addr2 hostarch.Addr, private bool, nwa
553584
// enqueues w to be woken by a send to w.C. If WaitPrepare returns nil, the
554585
// Waiter must be subsequently removed by calling WaitComplete, whether or not
555586
// a wakeup is received on w.C.
556-
func (m *Manager) WaitPrepare(w *Waiter, t Target, addr hostarch.Addr, private bool, val uint32, bitmask uint32) error {
587+
func (m *Manager) WaitPrepare(w *Waiter, t Target, addr usermem.Addr, private bool, val uint32, bitmask uint32, forever bool) error {
557588
k, err := getKey(t, addr, private)
558589
if err != nil {
559590
return err
560591
}
561592
// Ownership of k is transferred to w below.
562593

563594
// Prepare the Waiter before taking the bucket lock.
564-
select {
565-
case <-w.C:
566-
default:
595+
if !private || !forever {
596+
select {
597+
case <-w.C:
598+
default:
599+
}
567600
}
568601
w.key = k
569602
w.bitmask = bitmask
570603

571604
b := m.lockBucket(&k)
605+
if private {
606+
w.Forever = forever
607+
} else {
608+
w.Forever = false
609+
}
610+
runtime.ClearGStatus()
572611
// This function is very hot; avoid defer.
573612

574613
// Perform our atomic check.
@@ -578,6 +617,7 @@ func (m *Manager) WaitPrepare(w *Waiter, t Target, addr hostarch.Addr, private b
578617
return err
579618
}
580619

620+
w.Woken = false
581621
// Add the waiter to the bucket.
582622
b.waiters.PushBack(w)
583623
w.bucket.Store(b)
@@ -793,6 +833,6 @@ func (m *Manager) unlockPILocked(t Target, addr hostarch.Addr, tid uint32, b *bu
793833
return syserror.EINVAL
794834
}
795835

796-
b.wakeWaiterLocked(next)
836+
b.wakeWaiterLocked(next, -1)
797837
return nil
798838
}

pkg/sentry/kernel/futex/futex_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ func futexKind(private bool) string {
7575

7676
func newPreparedTestWaiter(t *testing.T, m *Manager, ta Target, addr hostarch.Addr, private bool, val uint32, bitmask uint32) *Waiter {
7777
w := NewWaiter()
78-
if err := m.WaitPrepare(w, ta, addr, private, val, bitmask); err != nil {
78+
w.G = runtime.GetG()
79+
if err := m.WaitPrepare(w, ta, addr, private, val, bitmask, true); err != nil {
7980
t.Fatalf("WaitPrepare failed: %v", err)
8081
}
8182
return w
@@ -487,15 +488,16 @@ func (t *testMutex) Lock() {
487488

488489
// Wait for it to be "not locked".
489490
w := NewWaiter()
490-
err := t.m.WaitPrepare(w, t.d, t.a, true, testMutexLocked, ^uint32(0))
491+
w.G = runtime.GetG()
492+
err := t.m.WaitPrepare(w, t.d, t.a, true, testMutexLocked, ^uint32(0), true)
491493
if err == unix.EAGAIN {
492494
continue
493495
}
494496
if err != nil {
495497
// Should never happen.
496498
panic("WaitPrepare returned unexpected error: " + err.Error())
497499
}
498-
<-w.C
500+
runtime.BlockG()
499501
t.m.WaitComplete(w, t.d)
500502
}
501503
}

pkg/sentry/kernel/task_block.go

+13
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ func (t *Task) block(C <-chan struct{}, timerChan <-chan struct{}) error {
166166
}
167167
}
168168

169+
func (t *Task) TaskStartRegion() *trace.Region {
170+
return trace.StartRegion(t.traceContext, blockRegion)
171+
}
172+
173+
func (t *Task) TaskEndRegion(region *trace.Region) {
174+
region.End()
175+
}
176+
169177
// SleepStart implements context.ChannelSleeper.SleepStart.
170178
func (t *Task) SleepStart() <-chan struct{} {
171179
t.assertTaskGoroutine()
@@ -240,6 +248,11 @@ func (t *Task) interrupt() {
240248
func (t *Task) interruptSelf() {
241249
select {
242250
case t.interruptChan <- struct{}{}:
251+
w := t.FutexWaiter()
252+
if w.Forever && t.Futex().KeyMatch(w) {
253+
w.Interrupt = true
254+
runtime.WakeG(w.G)
255+
}
243256
default:
244257
}
245258
// platform.Context.Interrupt() is unnecessary since a task goroutine

pkg/sentry/syscalls/linux/sys_futex.go

+48-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package linux
1616

1717
import (
18+
"runtime"
1819
"time"
1920

2021
"gvisor.dev/gvisor/pkg/abi/linux"
@@ -53,13 +54,29 @@ func (f *futexWaitRestartBlock) Restart(t *kernel.Task) (uintptr, error) {
5354
// arguments.
5455
func futexWaitAbsolute(t *kernel.Task, clockRealtime bool, ts linux.Timespec, forever bool, addr hostarch.Addr, private bool, val, mask uint32) (uintptr, error) {
5556
w := t.FutexWaiter()
56-
err := t.Futex().WaitPrepare(w, t, addr, private, val, mask)
57+
if w.G == 0 {
58+
w.G = runtime.GetG()
59+
}
60+
err := t.Futex().WaitPrepare(w, t, addr, private, val, mask, forever)
5761
if err != nil {
62+
w.Forever = false
5863
return 0, err
5964
}
6065

6166
if forever {
62-
err = t.Block(w.C)
67+
if private && w.Forever {
68+
t.SleepStart()
69+
region := t.TaskStartRegion()
70+
runtime.BlockG()
71+
if w.Interrupt == true {
72+
w.Interrupt = false
73+
err = syserror.ErrInterrupted
74+
}
75+
t.TaskEndRegion(region)
76+
t.SleepFinish(true)
77+
} else {
78+
err = t.Block(w.C)
79+
}
6380
} else if clockRealtime {
6481
notifier, tchan := ktime.NewChannelNotifier()
6582
timer := ktime.NewTimer(t.Kernel().RealtimeClock(), notifier)
@@ -73,7 +90,10 @@ func futexWaitAbsolute(t *kernel.Task, clockRealtime bool, ts linux.Timespec, fo
7390
err = t.BlockWithDeadline(w.C, true, ktime.FromTimespec(ts))
7491
}
7592

76-
t.Futex().WaitComplete(w, t)
93+
if !forever || !private || err != nil {
94+
t.Futex().WaitComplete(w)
95+
}
96+
w.Forever = false
7797
return 0, syserror.ConvertIntr(err, syserror.ERESTARTSYS)
7898
}
7999

@@ -89,13 +109,33 @@ func futexWaitAbsolute(t *kernel.Task, clockRealtime bool, ts linux.Timespec, fo
89109
// syscall is restarted with the remaining timeout.
90110
func futexWaitDuration(t *kernel.Task, duration time.Duration, forever bool, addr hostarch.Addr, private bool, val, mask uint32) (uintptr, error) {
91111
w := t.FutexWaiter()
92-
err := t.Futex().WaitPrepare(w, t, addr, private, val, mask)
112+
if w.G == 0 {
113+
w.G = runtime.GetG()
114+
}
115+
err := t.Futex().WaitPrepare(w, t, addr, private, val, mask, forever)
93116
if err != nil {
117+
w.Forever = false
94118
return 0, err
95119
}
96120

97-
remaining, err := t.BlockWithTimeout(w.C, !forever, duration)
98-
t.Futex().WaitComplete(w, t)
121+
var remaining time.Duration
122+
if private && forever {
123+
t.SleepStart()
124+
region := t.TaskStartRegion()
125+
runtime.BlockG()
126+
if w.Interrupt == true {
127+
w.Interrupt = false
128+
err = syserror.ErrInterrupted
129+
}
130+
t.TaskEndRegion(region)
131+
t.SleepFinish(true)
132+
} else {
133+
remaining, err = t.BlockWithTimeout(w.C, !forever, duration)
134+
}
135+
if !forever || !private || err != nil {
136+
t.Futex().WaitComplete(w)
137+
}
138+
w.Forever = false
99139
if err == nil {
100140
return 0, nil
101141
}
@@ -126,6 +166,7 @@ func futexWaitDuration(t *kernel.Task, duration time.Duration, forever bool, add
126166

127167
func futexLockPI(t *kernel.Task, ts linux.Timespec, forever bool, addr hostarch.Addr, private bool) error {
128168
w := t.FutexWaiter()
169+
w.Forever = false
129170
locked, err := t.Futex().LockPI(w, t, addr, uint32(t.ThreadID()), private, false)
130171
if err != nil {
131172
return err
@@ -154,6 +195,7 @@ func futexLockPI(t *kernel.Task, ts linux.Timespec, forever bool, addr hostarch.
154195

155196
func tryLockPI(t *kernel.Task, addr hostarch.Addr, private bool) error {
156197
w := t.FutexWaiter()
198+
w.Forever = false
157199
locked, err := t.Futex().LockPI(w, t, addr, uint32(t.ThreadID()), private, true)
158200
if err != nil {
159201
return err

0 commit comments

Comments
 (0)