Skip to content

Commit e102fcf

Browse files
authored
Merge pull request #639 from fuweid/cp-copy-key-before-seek
[1.3] bucket.Put: copy key before seek
2 parents defa564 + fabe2fb commit e102fcf

File tree

2 files changed

+134
-11
lines changed

2 files changed

+134
-11
lines changed

bucket.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,17 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
162162
return nil, ErrBucketNameRequired
163163
}
164164

165+
// Insert into node.
166+
// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
167+
// it from being marked as leaking, and accordingly cannot be allocated on stack.
168+
newKey := cloneBytes(key)
169+
165170
// Move cursor to correct position.
166171
c := b.Cursor()
167-
k, _, flags := c.seek(key)
172+
k, _, flags := c.seek(newKey)
168173

169174
// Return an error if there is an existing key.
170-
if bytes.Equal(key, k) {
175+
if bytes.Equal(newKey, k) {
171176
if (flags & bucketLeafFlag) != 0 {
172177
return nil, ErrBucketExists
173178
}
@@ -182,16 +187,14 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
182187
}
183188
var value = bucket.write()
184189

185-
// Insert into node.
186-
key = cloneBytes(key)
187-
c.node().put(key, key, value, 0, bucketLeafFlag)
190+
c.node().put(newKey, newKey, value, 0, bucketLeafFlag)
188191

189192
// Since subbuckets are not allowed on inline buckets, we need to
190193
// dereference the inline page, if it exists. This will cause the bucket
191194
// to be treated as a regular, non-inline bucket for the rest of the tx.
192195
b.page = nil
193196

194-
return b.Bucket(key), nil
197+
return b.Bucket(newKey), nil
195198
}
196199

197200
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
@@ -288,18 +291,23 @@ func (b *Bucket) Put(key []byte, value []byte) error {
288291
return ErrValueTooLarge
289292
}
290293

294+
// Insert into node.
295+
// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
296+
// it from being marked as leaking, and accordingly cannot be allocated on stack.
297+
newKey := cloneBytes(key)
298+
291299
// Move cursor to correct position.
292300
c := b.Cursor()
293-
k, _, flags := c.seek(key)
301+
k, _, flags := c.seek(newKey)
294302

295303
// Return an error if there is an existing key with a bucket value.
296-
if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
304+
if bytes.Equal(newKey, k) && (flags&bucketLeafFlag) != 0 {
297305
return ErrIncompatibleValue
298306
}
299307

300-
// Insert into node.
301-
key = cloneBytes(key)
302-
c.node().put(key, key, value, 0, 0)
308+
// gofail: var beforeBucketPut struct{}
309+
310+
c.node().put(newKey, newKey, value, 0, 0)
303311

304312
return nil
305313
}

tests/failpoint/db_failpoint_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,118 @@ func TestFailpoint_mLockFail_When_remap(t *testing.T) {
9494

9595
require.NoError(t, err)
9696
}
97+
98+
// TestIssue72 reproduces issue 72.
99+
//
100+
// When bbolt is processing a `Put` invocation, the key might be concurrently
101+
// updated by the application which calls the `Put` API (although it shouldn't).
102+
// It might lead to a situation that bbolt use an old key to find a proper
103+
// position to insert the key/value pair, but actually inserts a new key.
104+
// Eventually it might break the rule that all keys should be sorted. In a
105+
// worse case, it might cause page elements to point to already freed pages.
106+
//
107+
// REF: https://github.com/etcd-io/bbolt/issues/72
108+
func TestIssue72(t *testing.T) {
109+
db := btesting.MustCreateDBWithOption(t, &bolt.Options{PageSize: 4096})
110+
111+
bucketName := []byte(t.Name())
112+
err := db.Update(func(tx *bolt.Tx) error {
113+
_, txerr := tx.CreateBucket(bucketName)
114+
return txerr
115+
})
116+
require.NoError(t, err)
117+
118+
// The layout is like:
119+
//
120+
// +--+--+--+
121+
// +------+1 |3 |10+---+
122+
// | +-++--+--+ |
123+
// | | |
124+
// | | |
125+
// +v-+--+ +v-+--+ +-v+--+--+
126+
// |1 |2 | |3 |4 | |10|11|12|
127+
// +--+--+ +--+--+ +--+--+--+
128+
//
129+
err = db.Update(func(tx *bolt.Tx) error {
130+
bk := tx.Bucket(bucketName)
131+
132+
for _, id := range []int{1, 2, 3, 4, 10, 11, 12} {
133+
if txerr := bk.Put(idToBytes(id), make([]byte, 1000)); txerr != nil {
134+
return txerr
135+
}
136+
}
137+
return nil
138+
})
139+
require.NoError(t, err)
140+
141+
require.NoError(t, gofail.Enable("beforeBucketPut", `sleep(5000)`))
142+
143+
// +--+--+--+
144+
// +------+1 |3 |1 +---+
145+
// | +-++--+--+ |
146+
// | | |
147+
// | | |
148+
// +v-+--+ +v-+--+ +-v+--+--+--+
149+
// |1 |2 | |3 |4 | |1 |10|11|12|
150+
// +--+--+ +--+--+ +--+--+--+--+
151+
//
152+
key := idToBytes(13)
153+
updatedKey := idToBytes(1)
154+
err = db.Update(func(tx *bolt.Tx) error {
155+
bk := tx.Bucket(bucketName)
156+
157+
go func() {
158+
time.Sleep(3 * time.Second)
159+
copy(key, updatedKey)
160+
}()
161+
return bk.Put(key, make([]byte, 100))
162+
})
163+
require.NoError(t, err)
164+
165+
require.NoError(t, gofail.Disable("beforeBucketPut"))
166+
167+
// bbolt inserts 100 into last branch page. Since there are two `1`
168+
// keys in branch, spill operation will update first `1` pointer and
169+
// then last one won't be updated and continues to point to freed page.
170+
//
171+
//
172+
// +--+--+--+
173+
// +---------------+1 |3 |1 +---------+
174+
// | +--++-+--+ |
175+
// | | |
176+
// | | |
177+
// | +--+--+ +v-+--+ +-----v-----+
178+
// | |1 |2 | |3 |4 | |freed page |
179+
// | +--+--+ +--+--+ +-----------+
180+
// |
181+
// +v-+--+--+--+---+
182+
// |1 |10|11|12|100|
183+
// +--+--+--+--+---+
184+
err = db.Update(func(tx *bolt.Tx) error {
185+
return tx.Bucket(bucketName).Put(idToBytes(100), make([]byte, 100))
186+
})
187+
require.NoError(t, err)
188+
189+
defer func() {
190+
if r := recover(); r != nil {
191+
t.Logf("panic info:\n %v", r)
192+
}
193+
}()
194+
195+
// Add more keys to ensure branch node to spill.
196+
err = db.Update(func(tx *bolt.Tx) error {
197+
bk := tx.Bucket(bucketName)
198+
199+
for _, id := range []int{101, 102, 103, 104, 105} {
200+
if txerr := bk.Put(idToBytes(id), make([]byte, 1000)); txerr != nil {
201+
return txerr
202+
}
203+
}
204+
return nil
205+
})
206+
require.NoError(t, err)
207+
}
208+
209+
func idToBytes(id int) []byte {
210+
return []byte(fmt.Sprintf("%010d", id))
211+
}

0 commit comments

Comments
 (0)