Skip to content

Commit b8cb654

Browse files
Use strict synchronization for revision getter to minimize flaky result caused by time racing.
Signed-off-by: Joshua Zhang <[email protected]> Addressed review comments Co-authored-by: Abhishek Kr Srivastav <[email protected]> Signed-off-by: Abhishek Kr Srivastav <[email protected]>
1 parent 2ed418c commit b8cb654

File tree

2 files changed

+26
-17
lines changed

2 files changed

+26
-17
lines changed

client/pkg/testutil/recorder.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,10 @@ func (r *recorderStream) Chan() <-chan Action {
115115

116116
func (r *recorderStream) Wait(n int) ([]Action, error) {
117117
acts := make([]Action, n)
118-
timeoutC := time.After(r.waitTimeout)
118+
var timeoutC <-chan time.Time
119+
if r.waitTimeout != 0 {
120+
timeoutC = time.After(r.waitTimeout)
121+
}
119122
for i := 0; i < n; i++ {
120123
select {
121124
case acts[i] = <-r.ch:

server/etcdserver/api/v3compactor/periodic_test.go

+22-16
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) {
3333

3434
fc := clockwork.NewFakeClock()
3535
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
36-
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
36+
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
3737
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
3838
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
3939

@@ -43,8 +43,8 @@ func TestPeriodicHourly(t *testing.T) {
4343
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
4444

4545
// compaction doesn't happen til 2 hours elapse
46-
for i := 0; i < initialIntervals; i++ {
47-
rg.Wait(1)
46+
for i := 0; i < initialIntervals-1; i++ {
47+
waitOneAction(t, rg)
4848
fc.Advance(tb.getRetryInterval())
4949
}
5050

@@ -63,7 +63,7 @@ func TestPeriodicHourly(t *testing.T) {
6363
for i := 0; i < 3; i++ {
6464
// advance one hour, one revision for each interval
6565
for j := 0; j < intervalsPerPeriod; j++ {
66-
rg.Wait(1)
66+
waitOneAction(t, rg)
6767
fc.Advance(tb.getRetryInterval())
6868
}
6969

@@ -84,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) {
8484
retentionDuration := time.Duration(retentionMinutes) * time.Minute
8585

8686
fc := clockwork.NewFakeClock()
87-
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
87+
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
8888
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
8989
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
9090

@@ -94,8 +94,8 @@ func TestPeriodicMinutes(t *testing.T) {
9494
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
9595

9696
// compaction doesn't happen til 5 minutes elapse
97-
for i := 0; i < initialIntervals; i++ {
98-
rg.Wait(1)
97+
for i := 0; i < initialIntervals-1; i++ {
98+
waitOneAction(t, rg)
9999
fc.Advance(tb.getRetryInterval())
100100
}
101101

@@ -113,7 +113,7 @@ func TestPeriodicMinutes(t *testing.T) {
113113
for i := 0; i < 5; i++ {
114114
// advance 5-minute, one revision for each interval
115115
for j := 0; j < intervalsPerPeriod; j++ {
116-
rg.Wait(1)
116+
waitOneAction(t, rg)
117117
fc.Advance(tb.getRetryInterval())
118118
}
119119

@@ -132,7 +132,7 @@ func TestPeriodicMinutes(t *testing.T) {
132132
func TestPeriodicPause(t *testing.T) {
133133
fc := clockwork.NewFakeClock()
134134
retentionDuration := time.Hour
135-
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
135+
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
136136
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
137137
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
138138

@@ -143,7 +143,7 @@ func TestPeriodicPause(t *testing.T) {
143143

144144
// tb will collect 3 hours of revisions but not compact since paused
145145
for i := 0; i < n*3; i++ {
146-
rg.Wait(1)
146+
waitOneAction(t, rg)
147147
fc.Advance(tb.getRetryInterval())
148148
}
149149
// t.revs = [21 22 23 24 25 26 27 28 29 30]
@@ -156,7 +156,7 @@ func TestPeriodicPause(t *testing.T) {
156156

157157
// tb resumes to being blocked on the clock
158158
tb.Resume()
159-
rg.Wait(1)
159+
waitOneAction(t, rg)
160160

161161
// unblock clock, will kick off a compaction at T=3h6m by retry
162162
fc.Advance(tb.getRetryInterval())
@@ -179,7 +179,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
179179
retentionDuration := time.Duration(retentionMinutes) * time.Minute
180180

181181
fc := clockwork.NewFakeClock()
182-
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
182+
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
183183
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
184184
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
185185

@@ -189,10 +189,10 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
189189
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
190190

191191
// first compaction happens til 5 minutes elapsed
192-
for i := 0; i < initialIntervals; i++ {
192+
for i := 0; i < initialIntervals-1; i++ {
193193
// every time set the same revision with 100
194194
rg.SetRev(int64(100))
195-
rg.Wait(1)
195+
waitOneAction(t, rg)
196196
fc.Advance(tb.getRetryInterval())
197197
}
198198

@@ -212,7 +212,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
212212
for i := 0; i < 5; i++ {
213213
for j := 0; j < intervalsPerPeriod; j++ {
214214
rg.SetRev(int64(100))
215-
rg.Wait(1)
215+
waitOneAction(t, rg)
216216
fc.Advance(tb.getRetryInterval())
217217
}
218218

@@ -224,7 +224,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
224224

225225
// when revision changed, compaction is normally
226226
for i := 0; i < initialIntervals; i++ {
227-
rg.Wait(1)
227+
waitOneAction(t, rg)
228228
fc.Advance(tb.getRetryInterval())
229229
}
230230

@@ -238,3 +238,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
238238
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
239239
}
240240
}
241+
242+
func waitOneAction(t *testing.T, r testutil.Recorder) {
243+
if actions, _ := r.Wait(1); len(actions) != 1 {
244+
t.Errorf("expect 1 action, got %v instead", len(actions))
245+
}
246+
}

0 commit comments

Comments
 (0)