Skip to content

Allow to preserve legacy DBs layout, use leveldb-only layout by default #357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
32 changes: 32 additions & 0 deletions cmd/opera/launcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ var (
Name: "exitwhensynced.epoch",
Usage: "Exits after synchronisation reaches the required epoch",
}

DBMigrationModeFlag = cli.StringFlag{
Name: "db.migration.mode",
Usage: "MultiDB migration mode ('reformat' or 'rebuild')",
}
DBPresetFlag = cli.StringFlag{
Name: "db.preset",
Usage: "DBs layout preset ('pbl-1' or 'ldb-1' or 'legacy-ldb' or 'legacy-pbl')",
}
)

type GenesisTemplate struct {
Expand Down Expand Up @@ -336,6 +345,28 @@ func gossipStoreConfigWithFlags(ctx *cli.Context, src gossip.StoreConfig) (gossi
return cfg, nil
}

func setDBConfig(ctx *cli.Context, cfg integration.DBsConfig, cacheRatio cachescale.Func) integration.DBsConfig {
if ctx.GlobalIsSet(DBPresetFlag.Name) {
preset := ctx.GlobalString(DBPresetFlag.Name)
switch preset {
case "pbl-1":
cfg = integration.Pbl1DBsConfig(cacheRatio.U64, uint64(utils.MakeDatabaseHandles()))
case "ldb-1":
cfg = integration.Ldb1DBsConfig(cacheRatio.U64, uint64(utils.MakeDatabaseHandles()))
case "legacy-ldb":
cfg = integration.LdbLegacyDBsConfig(cacheRatio.U64, uint64(utils.MakeDatabaseHandles()))
case "legacy-pbl":
cfg = integration.PblLegacyDBsConfig(cacheRatio.U64, uint64(utils.MakeDatabaseHandles()))
default:
utils.Fatalf("--%s must be 'pbl-1', 'ldb-1', 'legacy-pbl' or 'legacy-ldb'", DBPresetFlag.Name)
}
}
if ctx.GlobalIsSet(DBMigrationModeFlag.Name) {
cfg.MigrationMode = ctx.GlobalString(DBMigrationModeFlag.Name)
}
return cfg
}

func nodeConfigWithFlags(ctx *cli.Context, cfg node.Config) node.Config {
utils.SetNodeConfig(ctx, &cfg)

Expand Down Expand Up @@ -411,6 +442,7 @@ func mayMakeAllConfigs(ctx *cli.Context) (*config, error) {
return nil, err
}
cfg.Node = nodeConfigWithFlags(ctx, cfg.Node)
cfg.DBs = setDBConfig(ctx, cfg.DBs, cacheRatio)

err = setValidator(ctx, &cfg.Emitter)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/opera/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func initFlags() {
validatorPasswordFlag,
SyncModeFlag,
GCModeFlag,
DBPresetFlag,
DBMigrationModeFlag,
}
legacyRpcFlags = []cli.Flag{
utils.NoUSBFlag,
Expand Down
6 changes: 3 additions & 3 deletions gossip/c_block_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ var (
snapshotStorageReadTimer = metrics.GetOrRegisterTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.GetOrRegisterTimer("chain/snapshot/commits", nil)

blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil)
blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil)
blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil)
blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil)
blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil)
blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil)
)

type ExtendedTxPosition struct {
Expand Down
5 changes: 4 additions & 1 deletion integration/assembly.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,17 @@ func makeEngine(chaindataDir string, g *genesis.Genesis, genesisProc bool, cfg C
// MakeEngine makes consensus engine from config.
func MakeEngine(chaindataDir string, g *genesis.Genesis, cfg Configs) (*abft.Lachesis, *vecmt.Index, *gossip.Store, *abft.Store, gossip.BlockProc, func() error) {
// Legacy DBs migrate
if cfg.DBs.MigrationMode != "reformat" && cfg.DBs.MigrationMode != "rebuild" && cfg.DBs.MigrationMode != "" {
utils.Fatalf("MigrationMode must be 'reformat' or 'rebuild'")
}
if !isEmpty(path.Join(chaindataDir, "gossip")) {
MakeDBDirs(chaindataDir)
genesisProducers, _ := SupportedDBs(chaindataDir, cfg.DBs.GenesisCache)
dbs, err := MakeDirectMultiProducer(genesisProducers, cfg.DBs.Routing)
if err != nil {
utils.Fatalf("Failed to make engine: %v", err)
}
err = migrateLegacyDBs(chaindataDir, dbs)
err = migrateLegacyDBs(chaindataDir, dbs, cfg.DBs.MigrationMode, cfg.DBs.Routing)
_ = dbs.Close()
if err != nil {
utils.Fatalf("Failed to migrate state: %v", err)
Expand Down
78 changes: 4 additions & 74 deletions integration/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (
"github.com/Fantom-foundation/lachesis-base/utils/fmtfilter"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/syndtr/goleveldb/leveldb/opt"

"github.com/Fantom-foundation/go-opera/gossip"
"github.com/Fantom-foundation/go-opera/utils/dbutil/asyncflushproducer"
)

type DBsConfig struct {
Routing RoutingConfig
RuntimeCache DBsCacheConfig
GenesisCache DBsCacheConfig
Routing RoutingConfig
RuntimeCache DBsCacheConfig
GenesisCache DBsCacheConfig
MigrationMode string
}

type DBCacheConfig struct {
Expand All @@ -39,76 +39,6 @@ type DBsCacheConfig struct {
Table map[string]DBCacheConfig
}

func DefaultDBsConfig(scale func(uint64) uint64, fdlimit uint64) DBsConfig {
return DBsConfig{
Routing: DefaultRoutingConfig(),
RuntimeCache: DefaultRuntimeDBsCacheConfig(scale, fdlimit),
GenesisCache: DefaultGenesisDBsCacheConfig(scale, fdlimit),
}
}

func DefaultRuntimeDBsCacheConfig(scale func(uint64) uint64, fdlimit uint64) DBsCacheConfig {
return DBsCacheConfig{
Table: map[string]DBCacheConfig{
"evm-data": {
Cache: scale(242 * opt.MiB),
Fdlimit: fdlimit*242/700 + 1,
},
"evm-logs": {
Cache: scale(110 * opt.MiB),
Fdlimit: fdlimit*110/700 + 1,
},
"main": {
Cache: scale(186 * opt.MiB),
Fdlimit: fdlimit*186/700 + 1,
},
"events": {
Cache: scale(87 * opt.MiB),
Fdlimit: fdlimit*87/700 + 1,
},
"epoch-%d": {
Cache: scale(75 * opt.MiB),
Fdlimit: fdlimit*75/700 + 1,
},
"": {
Cache: 64 * opt.MiB,
Fdlimit: fdlimit/100 + 1,
},
},
}
}

func DefaultGenesisDBsCacheConfig(scale func(uint64) uint64, fdlimit uint64) DBsCacheConfig {
return DBsCacheConfig{
Table: map[string]DBCacheConfig{
"main": {
Cache: scale(1024 * opt.MiB),
Fdlimit: fdlimit*1024/3072 + 1,
},
"evm-data": {
Cache: scale(1024 * opt.MiB),
Fdlimit: fdlimit*1024/3072 + 1,
},
"evm-logs": {
Cache: scale(1024 * opt.MiB),
Fdlimit: fdlimit*1024/3072 + 1,
},
"events": {
Cache: scale(1 * opt.MiB),
Fdlimit: fdlimit*1/3072 + 1,
},
"epoch-%d": {
Cache: scale(1 * opt.MiB),
Fdlimit: fdlimit*1/3072 + 1,
},
"": {
Cache: 16 * opt.MiB,
Fdlimit: fdlimit/100 + 1,
},
},
}
}

func SupportedDBs(chaindataDir string, cfg DBsCacheConfig) (map[multidb.TypeName]kvdb.IterableDBProducer, map[multidb.TypeName]kvdb.FullDBProducer) {
if chaindataDir == "inmemory" || chaindataDir == "" {
chaindataDir, _ = ioutil.TempDir("", "opera-tmp")
Expand Down
162 changes: 108 additions & 54 deletions integration/legacy_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,20 @@ func translateGossipPrefix(p byte) byte {
return p
}

func migrateLegacyDBs(chaindataDir string, dbs kvdb.FlushableDBProducer) error {
if !isEmpty(path.Join(chaindataDir, "gossip")) {
func equalRoutingConfig(a, b RoutingConfig) bool {
if len(a.Table) != len(b.Table) {
return false
}
for k, v := range a.Table {
if b.Table[k] != v {
return false
}
}
return true
}

func migrateLegacyDBs(chaindataDir string, dbs kvdb.FlushableDBProducer, mode string, layout RoutingConfig) error {
{ // didn't erase the brackets to avoid massive code changes
// migrate DB layout
cacheFn, err := dbCacheFdlimit(DBsCacheConfig{
Table: map[string]DBCacheConfig{
Expand All @@ -201,10 +213,13 @@ func migrateLegacyDBs(chaindataDir string, dbs kvdb.FlushableDBProducer) error {
return err
}
var oldDBs kvdb.IterableDBProducer
var oldDBsType string
if fileExists(path.Join(chaindataDir, "gossip", "LOG")) {
oldDBs = leveldb.NewProducer(chaindataDir, cacheFn)
oldDBsType = "ldb"
} else {
oldDBs = pebble.NewProducer(chaindataDir, cacheFn)
oldDBsType = "pbl"
}
openOldDB := func(name string) kvdb.Store {
db, err := oldDBs.OpenDB(name)
Expand All @@ -221,67 +236,106 @@ func migrateLegacyDBs(chaindataDir string, dbs kvdb.FlushableDBProducer) error {
return db
}

// move lachesis, lachesis-%d and gossip-%d DBs
for _, name := range oldDBs.Names() {
if strings.HasPrefix(name, "lachesis") || strings.HasPrefix(name, "gossip-") {
mustTransform(transformTask{
openSrc: func() kvdb.Store {
return skipkeys.Wrap(openOldDB(name), MetadataPrefix)
},
openDst: func() kvdb.Store {
return openNewDB(name)
},
name: name,
dir: chaindataDir,
})
switch mode {
case "rebuild":
// move lachesis, lachesis-%d and gossip-%d DBs
for _, name := range oldDBs.Names() {
if strings.HasPrefix(name, "lachesis") || strings.HasPrefix(name, "gossip-") {
mustTransform(transformTask{
openSrc: func() kvdb.Store {
return skipkeys.Wrap(openOldDB(name), MetadataPrefix)
},
openDst: func() kvdb.Store {
return openNewDB(name)
},
name: name,
dir: chaindataDir,
})
}
}
}

// move gossip DB

// move logs
mustTransform(transformTask{
openSrc: func() kvdb.Store {
return newClosableTable(openOldDB("gossip"), []byte("Lr"))
},
openDst: func() kvdb.Store {
return openNewDB("evm-logs/r")
},
name: "gossip/Lr",
dir: chaindataDir,
})
mustTransform(transformTask{
openSrc: func() kvdb.Store {
return newClosableTable(openOldDB("gossip"), []byte("Lt"))
},
openDst: func() kvdb.Store {
return openNewDB("evm-logs/t")
},
name: "gossip/Lt",
dir: chaindataDir,
})
// move gossip DB

// skip 0 prefix, as it contains flushID
for b := 1; b <= 0xff; b++ {
if b == int('L') {
// logs are already moved above
continue
}
// move logs
mustTransform(transformTask{
openSrc: func() kvdb.Store {
return newClosableTable(openOldDB("gossip"), []byte{byte(b)})
return newClosableTable(openOldDB("gossip"), []byte("Lr"))
},
openDst: func() kvdb.Store {
if b == int('M') || b == int('r') || b == int('x') || b == int('X') {
return openNewDB("evm/" + string([]byte{byte(b)}))
} else {
return openNewDB("gossip/" + string([]byte{translateGossipPrefix(byte(b))}))
}
return openNewDB("evm-logs/r")
},
name: "gossip/Lr",
dir: chaindataDir,
})
mustTransform(transformTask{
openSrc: func() kvdb.Store {
return newClosableTable(openOldDB("gossip"), []byte("Lt"))
},
openDst: func() kvdb.Store {
return openNewDB("evm-logs/t")
},
name: fmt.Sprintf("gossip/%c", rune(b)),
dir: chaindataDir,
dropSrc: b == 0xff,
name: "gossip/Lt",
dir: chaindataDir,
})

// skip 0 prefix, as it contains flushID
for b := 1; b <= 0xff; b++ {
if b == int('L') {
// logs are already moved above
continue
}
mustTransform(transformTask{
openSrc: func() kvdb.Store {
return newClosableTable(openOldDB("gossip"), []byte{byte(b)})
},
openDst: func() kvdb.Store {
if b == int('M') || b == int('r') || b == int('x') || b == int('X') {
return openNewDB("evm/" + string([]byte{byte(b)}))
} else {
return openNewDB("gossip/" + string([]byte{translateGossipPrefix(byte(b))}))
}
},
name: fmt.Sprintf("gossip/%c", rune(b)),
dir: chaindataDir,
dropSrc: b == 0xff,
})
}
case "reformat":
if oldDBsType == "ldb" {
if !equalRoutingConfig(layout, LdbLegacyRoutingConfig()) {
return errors.New("reformatting DBs: missing --db.preset=legacy-ldb flag")
}
err = os.Rename(path.Join(chaindataDir, "gossip"), path.Join(chaindataDir, "leveldb-fsh", "main"))
if err != nil {
return err
}
for _, name := range oldDBs.Names() {
if strings.HasPrefix(name, "lachesis") || strings.HasPrefix(name, "gossip-") {
err = os.Rename(path.Join(chaindataDir, name), path.Join(chaindataDir, "leveldb-fsh", name))
if err != nil {
return err
}
}
}
} else {
if !equalRoutingConfig(layout, PblLegacyRoutingConfig()) {
return errors.New("reformatting DBs: missing --db.preset=legacy-pbl flag")
}
err = os.Rename(path.Join(chaindataDir, "gossip"), path.Join(chaindataDir, "pebble-fsh", "main"))
if err != nil {
return err
}
for _, name := range oldDBs.Names() {
if strings.HasPrefix(name, "lachesis") || strings.HasPrefix(name, "gossip-") {
err = os.Rename(path.Join(chaindataDir, name), path.Join(chaindataDir, "pebble-fsh", name))
if err != nil {
return err
}
}
}
}
default:
return errors.New("missing --db.migration.mode flag")
}
}

Expand Down
Loading