Skip to content

Commit 57ca392

Browse files
committed
Merge pull request #4 from m4ng0squ4sh/master
Performance improve and small fixes
2 parents 59a8609 + 51edcb3 commit 57ca392

File tree

3 files changed

+105
-48
lines changed

3 files changed

+105
-48
lines changed

reaper/reaper.go

+96-44
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"github.com/yosssi/boltstore/shared"
99
)
1010

11+
//##############//
12+
//### Public ###//
13+
//##############//
14+
1115
// Run invokes a reap function as a goroutine.
1216
func Run(db *bolt.DB, options Options) (chan<- struct{}, <-chan struct{}) {
1317
options.setDefault()
@@ -22,66 +26,114 @@ func Quit(quitC chan<- struct{}, doneC <-chan struct{}) {
2226
<-doneC
2327
}
2428

29+
//###############//
30+
//### Private ###//
31+
//###############//
32+
2533
func reap(db *bolt.DB, options Options, quitC <-chan struct{}, doneC chan<- struct{}) {
26-
var prevKey []byte
27-
for {
28-
err := db.View(func(tx *bolt.Tx) error {
29-
bucket := tx.Bucket(options.BucketName)
30-
if bucket == nil {
31-
return nil
32-
}
34+
// Create a new ticker
35+
ticker := time.NewTicker(options.CheckInterval)
3336

34-
c := bucket.Cursor()
37+
defer func() {
38+
// Stop the ticker
39+
ticker.Stop()
40+
}()
3541

36-
var i int
42+
var prevKey []byte
3743

38-
for k, v := c.Seek(prevKey); ; k, v = c.Next() {
39-
// If we hit the end of our sessions then
40-
// exit and start over next time.
41-
if k == nil {
42-
prevKey = nil
44+
for {
45+
select {
46+
case <-quitC: // Check if a quit signal is sent.
47+
doneC <- struct{}{}
48+
return
49+
case <-ticker.C: // Check if the ticker fires a signal.
50+
// This slice is a buffer to save all expired session keys.
51+
expiredSessionKeys := make([][]byte, 0)
52+
53+
// Start a bolt read transaction.
54+
err := db.View(func(tx *bolt.Tx) error {
55+
bucket := tx.Bucket(options.BucketName)
56+
if bucket == nil {
4357
return nil
4458
}
4559

46-
i++
60+
c := bucket.Cursor()
4761

48-
session, err := shared.Session(v)
49-
if err != nil {
50-
return err
51-
}
62+
var i int
63+
var isExpired bool
64+
65+
for k, v := c.Seek(prevKey); ; k, v = c.Next() {
66+
// If we hit the end of our sessions then
67+
// exit and start over next time.
68+
if k == nil {
69+
prevKey = nil
70+
return nil
71+
}
5272

53-
if shared.Expired(session) {
54-
err := db.Update(func(txu *bolt.Tx) error {
55-
return txu.Bucket(options.BucketName).Delete(k)
56-
})
73+
i++
74+
75+
// The flag if the session is expired
76+
isExpired = false
77+
78+
session, err := shared.Session(v)
5779
if err != nil {
58-
return err
80+
// Just remove the session with the invalid session data.
81+
// Log the error first.
82+
log.Printf("boltstore: removing session from database with invalid value: %v", err)
83+
isExpired = true
84+
} else if shared.Expired(session) {
85+
isExpired = true
5986
}
60-
}
6187

62-
if options.BatchSize == i {
63-
// Store the current key to the previous key.
64-
// Copy the byte slice key, because this data is
65-
// not safe outside of this transaction.
66-
prevKey = make([]byte, len(k))
67-
copy(prevKey, k)
68-
return nil
88+
if isExpired {
89+
// Copy the byte slice key, because this data is
90+
// not safe outside of this transaction.
91+
temp := make([]byte, len(k))
92+
copy(temp, k)
93+
94+
// Add it to the expired sessios keys slice
95+
expiredSessionKeys = append(expiredSessionKeys, temp)
96+
}
97+
98+
if options.BatchSize == i {
99+
// Store the current key to the previous key.
100+
// Copy the byte slice key, because this data is
101+
// not safe outside of this transaction.
102+
prevKey = make([]byte, len(k))
103+
copy(prevKey, k)
104+
return nil
105+
}
69106
}
107+
})
108+
109+
if err != nil {
110+
log.Printf("boltstore: obtain expired sessions error: %v", err)
70111
}
71-
})
72112

73-
if err != nil {
74-
log.Println(err.Error())
75-
}
113+
if len(expiredSessionKeys) > 0 {
114+
// Remove the expired sessions from the database
115+
err = db.Update(func(txu *bolt.Tx) error {
116+
// Get the bucket
117+
b := txu.Bucket(options.BucketName)
118+
if b == nil {
119+
return nil
120+
}
76121

77-
// Check if a quit signal is sent.
78-
select {
79-
case <-quitC:
80-
doneC <- struct{}{}
81-
return
82-
default:
83-
}
122+
// Remove all expired sessions in the slice
123+
for _, key := range expiredSessionKeys {
124+
err = b.Delete(key)
125+
if err != nil {
126+
return err
127+
}
128+
}
129+
130+
return nil
131+
})
84132

85-
time.Sleep(options.CheckInterval)
133+
if err != nil {
134+
log.Printf("boltstore: remove expired sessions error: %v", err)
135+
}
136+
}
137+
}
86138
}
87139
}

reaper/reaper_test.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package reaper
22

33
import (
4+
"code.google.com/p/gogoprotobuf/proto"
45
"fmt"
56
"testing"
67
"time"
7-
"code.google.com/p/gogoprotobuf/proto"
88

99
"github.com/boltdb/bolt"
1010
"github.com/yosssi/boltstore/shared"
@@ -62,7 +62,12 @@ func Test_reap(t *testing.T) {
6262

6363
// When shared.Session returns an error
6464
err = db.Update(func(tx *bolt.Tx) error {
65-
return tx.Bucket(bucketName).Put([]byte("test"), []byte("value"))
65+
session := shared.NewSession([]byte{}, -1)
66+
data, err := proto.Marshal(session)
67+
if err != nil {
68+
return err
69+
}
70+
return tx.Bucket(bucketName).Put([]byte("test"), data)
6671
})
6772
if err != nil {
6873
t.Error(err.Error())

shared/consts.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ const (
1515

1616
// Defaults for reaper.Options
1717
const (
18-
DefaultBatchSize = 10
19-
DefaultCheckInterval = time.Second
18+
DefaultBatchSize = 100
19+
DefaultCheckInterval = time.Minute
2020
)

0 commit comments

Comments
 (0)