Skip to content

Commit cf51fea

Browse files
authored
Merge pull request #435 from uprendis/feature/autocompact
Automatically compact DB
2 parents e585097 + c2cc618 commit cf51fea

File tree

7 files changed

+297
-27
lines changed

7 files changed

+297
-27
lines changed

cmd/opera/launcher/db-transform.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
"github.com/Fantom-foundation/go-opera/integration"
1919
"github.com/Fantom-foundation/go-opera/utils"
20-
"github.com/Fantom-foundation/go-opera/utils/dbutil/compactdb"
20+
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
2121
)
2222

2323
func dbTransform(ctx *cli.Context) error {
@@ -264,9 +264,9 @@ func transformComponent(datadir string, dbTypes, tmpDbTypes map[multidb.TypeName
264264
return err
265265
}
266266
toMove[dbLocatorOf(e.New)] = true
267-
newDB = batched.Wrap(newDB)
268-
defer newDB.Close()
269267
newHumanName := path.Join("tmp", string(e.New.Type), e.New.Name)
268+
newDB = batched.Wrap(autocompact.Wrap2M(newDB, opt.GiB, 16*opt.GiB, true, newHumanName))
269+
defer newDB.Close()
270270
log.Info("Copying DB table", "req", e.Req, "old_db", oldHumanName, "old_table", e.Old.Table,
271271
"new_db", newHumanName, "new_table", e.New.Table)
272272
oldTable := utils.NewTableOrSelf(oldDB, []byte(e.Old.Table))
@@ -292,11 +292,6 @@ func transformComponent(datadir string, dbTypes, tmpDbTypes map[multidb.TypeName
292292
keys = keys[:0]
293293
values = values[:0]
294294
}
295-
err = compactdb.Compact(newTable, newHumanName, 16*opt.GiB)
296-
if err != nil {
297-
log.Error("Database compaction failed", "err", err)
298-
return err
299-
}
300295
return nil
301296
}()
302297
if err != nil {

cmd/opera/launcher/export.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"gopkg.in/urfave/cli.v1"
2222

2323
"github.com/Fantom-foundation/go-opera/gossip"
24+
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
2425
)
2526

2627
var (
@@ -135,7 +136,7 @@ func exportEvmKeys(ctx *cli.Context) error {
135136
if err != nil {
136137
return err
137138
}
138-
keysDB := batched.Wrap(keysDB_)
139+
keysDB := batched.Wrap(autocompact.Wrap2M(keysDB_, opt.GiB, 16*opt.GiB, true, "evm-keys"))
139140
defer keysDB.Close()
140141

141142
it := gdb.EvmStore().EvmDb.NewIterator(nil, nil)

gossip/apply_genesis.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55

66
"github.com/Fantom-foundation/lachesis-base/hash"
77
"github.com/Fantom-foundation/lachesis-base/kvdb/batched"
8+
"github.com/syndtr/goleveldb/leveldb/opt"
89

910
"github.com/Fantom-foundation/go-opera/inter/iblockproc"
1011
"github.com/Fantom-foundation/go-opera/inter/ibr"
1112
"github.com/Fantom-foundation/go-opera/inter/ier"
1213
"github.com/Fantom-foundation/go-opera/opera/genesis"
14+
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
1315
)
1416

1517
// ApplyGenesis writes initial state.
@@ -76,7 +78,7 @@ func (s *Store) ApplyGenesis(g genesis.Genesis) (genesisHash hash.Hash, err erro
7678
func (s *Store) WrapTablesAsBatched() (unwrap func()) {
7779
origTables := s.table
7880

79-
batchedBlocks := batched.Wrap(s.table.Blocks)
81+
batchedBlocks := batched.Wrap(autocompact.Wrap2M(s.table.Blocks, opt.GiB, 16*opt.GiB, false, "blocks"))
8082
s.table.Blocks = batchedBlocks
8183

8284
batchedBlockHashes := batched.Wrap(s.table.BlockHashes)

gossip/evmstore/apply_genesis.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,28 @@
11
package evmstore
22

33
import (
4-
"github.com/Fantom-foundation/lachesis-base/kvdb"
54
"github.com/Fantom-foundation/lachesis-base/kvdb/batched"
5+
"github.com/syndtr/goleveldb/leveldb/opt"
66

77
"github.com/Fantom-foundation/go-opera/opera/genesis"
8+
"github.com/Fantom-foundation/go-opera/utils/adapters/ethdb2kvdb"
9+
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
810
)
911

1012
// ApplyGenesis writes initial state.
1113
func (s *Store) ApplyGenesis(g genesis.Genesis) (err error) {
12-
batch := s.EvmDb.NewBatch()
13-
defer batch.Reset()
14+
db := batched.Wrap(autocompact.Wrap2M(ethdb2kvdb.Wrap(s.EvmDb), opt.GiB, 16*opt.GiB, true, "evm"))
1415
g.RawEvmItems.ForEach(func(key, value []byte) bool {
16+
err = db.Put(key, value)
1517
if err != nil {
1618
return false
1719
}
18-
err = batch.Put(key, value)
19-
if err != nil {
20-
return false
21-
}
22-
if batch.ValueSize() > kvdb.IdealBatchSize {
23-
err = batch.Write()
24-
if err != nil {
25-
return false
26-
}
27-
batch.Reset()
28-
}
2920
return true
3021
})
3122
if err != nil {
3223
return err
3324
}
34-
return batch.Write()
25+
return db.Write()
3526
}
3627

3728
func (s *Store) WrapTablesAsBatched() (unwrap func()) {
@@ -45,7 +36,7 @@ func (s *Store) WrapTablesAsBatched() (unwrap func()) {
4536

4637
unwrapLogs := s.EvmLogs.WrapTablesAsBatched()
4738

48-
batchedReceipts := batched.Wrap(s.table.Receipts)
39+
batchedReceipts := batched.Wrap(autocompact.Wrap2M(s.table.Receipts, opt.GiB, 16*opt.GiB, false, "receipts"))
4940
s.table.Receipts = batchedReceipts
5041
return func() {
5142
_ = batchedTxs.Flush()

integration/legacy_migrate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ethereum/go-ethereum/log"
1919
"github.com/syndtr/goleveldb/leveldb/opt"
2020

21+
"github.com/Fantom-foundation/go-opera/utils/dbutil/autocompact"
2122
"github.com/Fantom-foundation/go-opera/utils/dbutil/compactdb"
2223
)
2324

@@ -46,7 +47,7 @@ type transformTask struct {
4647

4748
func transform(m transformTask) error {
4849
openDst := func() *batched.Store {
49-
return batched.Wrap(m.openDst())
50+
return batched.Wrap(autocompact.Wrap2M(m.openDst(), opt.GiB, 16*opt.GiB, true, ""))
5051
}
5152
openSrc := func() *batched.Store {
5253
return batched.Wrap(m.openSrc())

utils/dbutil/autocompact/store.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package autocompact
2+
3+
import (
4+
"sync"
5+
6+
"github.com/Fantom-foundation/lachesis-base/kvdb"
7+
"github.com/ethereum/go-ethereum/log"
8+
"github.com/status-im/keycard-go/hexutils"
9+
)
10+
11+
// Store implements automatic compacting of recently inserted/erased data according to provided strategy
12+
type Store struct {
13+
kvdb.Store
14+
limit uint64
15+
cont ContainerI
16+
newCont func() ContainerI
17+
compMu sync.Mutex
18+
name string
19+
}
20+
21+
type Batch struct {
22+
kvdb.Batch
23+
store *Store
24+
cont ContainerI
25+
}
26+
27+
func Wrap(s kvdb.Store, limit uint64, strategy func() ContainerI, name string) *Store {
28+
return &Store{
29+
Store: s,
30+
limit: limit,
31+
newCont: strategy,
32+
cont: strategy(),
33+
name: name,
34+
}
35+
}
36+
37+
func Wrap2(s kvdb.Store, limit1 uint64, limit2 uint64, strategy func() ContainerI, name string) *Store {
38+
return Wrap(Wrap(s, limit1, strategy, name), limit2, strategy, name)
39+
}
40+
41+
func Wrap2M(s kvdb.Store, limit1 uint64, limit2 uint64, forward bool, name string) *Store {
42+
strategy := NewBackwardsCont
43+
if forward {
44+
strategy = NewForwardCont
45+
}
46+
return Wrap2(s, limit1, limit2, strategy, name)
47+
}
48+
49+
func estSize(keyLen int, valLen int) uint64 {
50+
// Storage overheads, related to adding/deleting a record,
51+
//wouldn't be only proportional to length of key and value.
52+
//E.g. if one adds 10 records with length of 2, it will be more expensive than 1 record with length 20
53+
// Now, 64 wasn't really calculated but is rather a guesstimation
54+
return uint64(keyLen + valLen + 64)
55+
}
56+
57+
func (s *Store) onWrite(key []byte, size uint64) {
58+
s.compMu.Lock()
59+
defer s.compMu.Unlock()
60+
if key != nil {
61+
s.cont.Add(key, size)
62+
}
63+
s.mayCompact(false)
64+
}
65+
66+
func (s *Store) onBatchWrite(batchCont ContainerI) {
67+
s.compMu.Lock()
68+
defer s.compMu.Unlock()
69+
s.cont.Merge(batchCont)
70+
s.mayCompact(false)
71+
}
72+
73+
func (s *Store) compact() {
74+
s.compMu.Lock()
75+
defer s.compMu.Unlock()
76+
s.mayCompact(true)
77+
}
78+
79+
func (s *Store) mayCompact(force bool) {
80+
// error handling
81+
err := s.cont.Error()
82+
if err != nil {
83+
s.cont.Reset()
84+
s.newCont = NewDevnullCont
85+
s.cont = s.newCont()
86+
log.Warn("Autocompaction failed, which may lead to performance issues", "name", s.name, "err", err)
87+
}
88+
89+
if force || s.cont.Size() > s.limit {
90+
for _, r := range s.cont.Ranges() {
91+
log.Debug("Autocompact", "name", s.name, "from", hexutils.BytesToHex(r.minKey), "to", hexutils.BytesToHex(r.maxKey))
92+
_ = s.Store.Compact(r.minKey, r.maxKey)
93+
}
94+
s.cont.Reset()
95+
}
96+
}
97+
98+
func (s *Store) Put(key []byte, value []byte) error {
99+
defer s.onWrite(key, estSize(len(key), len(value)))
100+
return s.Store.Put(key, value)
101+
}
102+
103+
func (s *Store) Delete(key []byte) error {
104+
defer s.onWrite(key, estSize(len(key), 0))
105+
return s.Store.Delete(key)
106+
}
107+
108+
func (s *Store) Close() error {
109+
s.compact()
110+
return s.Store.Close()
111+
}
112+
113+
func (s *Store) NewBatch() kvdb.Batch {
114+
batch := s.Store.NewBatch()
115+
if batch == nil {
116+
return nil
117+
}
118+
return &Batch{
119+
Batch: batch,
120+
store: s,
121+
cont: s.newCont(),
122+
}
123+
}
124+
125+
func (s *Batch) Put(key []byte, value []byte) error {
126+
s.cont.Add(key, estSize(len(key), len(value)))
127+
return s.Batch.Put(key, value)
128+
}
129+
130+
func (s *Batch) Delete(key []byte) error {
131+
s.cont.Add(key, estSize(len(key), 0))
132+
return s.Batch.Delete(key)
133+
}
134+
135+
func (s *Batch) Reset() {
136+
s.cont.Reset()
137+
s.Batch.Reset()
138+
}
139+
140+
func (s *Batch) Write() error {
141+
defer s.store.onBatchWrite(s.cont)
142+
return s.Batch.Write()
143+
}

0 commit comments

Comments
 (0)