Skip to content

Commit 1ec9695

Browse files
authored
snapshots: refactor SnapshotRepository (#2056)
Refactor data access layer (DAL) logic and schema definition out of SnapshotRepository. * schema is defined in snapshot_bundle.hpp and snapshot_type.hpp, indexes are defined in SnapshotBundleFactory * find_segment is exposed with a snapshot type parameter * view_xx_segments is replaced by iteration on bundles in a forward or reverse order (view_bundles_reverse) * find_block_number is replaced by a new query object - TransactionBlockNumByTxnHashRepoQuery that runs TransactionBlockNumByTxnHashQuery on a view of bundles and returns the first match * replace for_each_xx with nested for loops * refactor queries to take SnapshotAndIndex DataModel::read_transactions_from_snapshot: remove an extra reserve
1 parent 65b9703 commit 1ec9695

27 files changed

+736
-484
lines changed

cmd/capi/execute.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <silkworm/capi/silkworm.h>
3434
#include <silkworm/db/access_layer.hpp>
3535
#include <silkworm/db/mdbx/mdbx.hpp>
36+
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
3637
#include <silkworm/db/snapshots/repository.hpp>
3738
#include <silkworm/infra/common/directories.hpp>
3839
#include <silkworm/infra/common/log.hpp>
@@ -150,8 +151,8 @@ std::vector<SilkwormChainSnapshot> collect_all_snapshots(SnapshotRepository& sna
150151
std::vector<SilkwormBodiesSnapshot> bodies_snapshot_sequence;
151152
std::vector<SilkwormTransactionsSnapshot> transactions_snapshot_sequence;
152153

153-
snapshot_repository.view_bundles(
154-
[&](const SnapshotBundle& bundle) {
154+
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles()) {
155+
{
155156
{
156157
SilkwormHeadersSnapshot raw_headers_snapshot{
157158
.segment{
@@ -202,8 +203,8 @@ std::vector<SilkwormChainSnapshot> collect_all_snapshots(SnapshotRepository& sna
202203
};
203204
transactions_snapshot_sequence.push_back(raw_transactions_snapshot);
204205
}
205-
return true;
206-
});
206+
}
207+
}
207208

208209
ensure(headers_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid header snapshot count");
209210
ensure(bodies_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid body snapshot count");
@@ -436,7 +437,7 @@ int main(int argc, char* argv[]) {
436437
int status_code = -1;
437438
if (settings.execute_blocks_settings) {
438439
// Execute specified block range using Silkworm API library
439-
SnapshotRepository repository{snapshot_settings};
440+
SnapshotRepository repository{snapshot_settings, std::make_unique<db::SnapshotBundleFactoryImpl>()};
440441
repository.reopen_folder();
441442
status_code = execute_blocks(handle, *settings.execute_blocks_settings, repository, data_dir);
442443
} else if (settings.build_indexes_settings) {

cmd/dev/check_changes.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <silkworm/core/types/evmc_bytes32.hpp>
2626
#include <silkworm/db/access_layer.hpp>
2727
#include <silkworm/db/buffer.hpp>
28+
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
2829
#include <silkworm/db/snapshots/repository.hpp>
2930
#include <silkworm/infra/common/directories.hpp>
3031
#include <silkworm/infra/common/log.hpp>
@@ -109,7 +110,8 @@ int main(int argc, char* argv[]) {
109110
throw std::runtime_error("Unable to retrieve chain config");
110111
}
111112

112-
snapshots::SnapshotRepository repository;
113+
auto snapshot_bundle_factory = std::make_unique<db::SnapshotBundleFactoryImpl>();
114+
snapshots::SnapshotRepository repository{snapshots::SnapshotSettings{}, std::move(snapshot_bundle_factory)};
113115
repository.reopen_folder();
114116
db::DataModel::set_snapshot_repository(&repository);
115117
db::DataModel access_layer{txn};

cmd/dev/snapshots.cpp

Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <silkworm/core/types/address.hpp>
3232
#include <silkworm/core/types/block_body_for_storage.hpp>
3333
#include <silkworm/core/types/evmc_bytes32.hpp>
34+
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
3435
#include <silkworm/db/snapshot_sync.hpp>
3536
#include <silkworm/db/snapshots/bittorrent/client.hpp>
3637
#include <silkworm/db/snapshots/body_index.hpp>
@@ -227,44 +228,48 @@ void decode_segment(const SnapSettings& settings, int repetitions) {
227228
SILK_INFO << "Decode snapshot elapsed: " << duration_as<std::chrono::milliseconds>(elapsed) << " msec";
228229
}
229230

231+
static std::unique_ptr<SnapshotBundleFactory> bundle_factory() {
232+
return std::make_unique<silkworm::db::SnapshotBundleFactoryImpl>();
233+
}
234+
230235
void count_bodies(const SnapSettings& settings, int repetitions) {
231-
SnapshotRepository snapshot_repo{settings};
236+
SnapshotRepository snapshot_repo{settings, bundle_factory()};
232237
snapshot_repo.reopen_folder();
233238
std::chrono::time_point start{std::chrono::steady_clock::now()};
234239
int num_bodies{0};
235240
uint64_t num_txns{0};
236241
for (int i{0}; i < repetitions; ++i) {
237-
const bool success = snapshot_repo.for_each_body([&](BlockNum number, const BlockBodyForStorage& b) -> bool {
238-
// If *system transactions* should not be counted, skip first and last tx in block body
239-
const auto base_txn_id{settings.skip_system_txs ? b.base_txn_id + 1 : b.base_txn_id};
240-
const auto txn_count{settings.skip_system_txs && b.txn_count >= 2 ? b.txn_count - 2 : b.txn_count};
241-
SILK_DEBUG << "Body number: " << number << " base_txn_id: " << base_txn_id << " txn_count: " << txn_count
242-
<< " #ommers: " << b.ommers.size();
243-
num_bodies++;
244-
num_txns += txn_count;
245-
return true;
246-
});
247-
ensure(success, "count_bodies: for_each_body failed");
242+
for (const SnapshotBundle& bundle : snapshot_repo.view_bundles()) {
243+
for (const BlockBodyForStorage& b : BodySnapshotReader{bundle.body_snapshot}) {
244+
// If *system transactions* should not be counted, skip first and last tx in block body
245+
const auto base_txn_id{settings.skip_system_txs ? b.base_txn_id + 1 : b.base_txn_id};
246+
const auto txn_count{settings.skip_system_txs && b.txn_count >= 2 ? b.txn_count - 2 : b.txn_count};
247+
SILK_TRACE << "Body number: " << num_bodies << " base_txn_id: " << base_txn_id << " txn_count: " << txn_count
248+
<< " #ommers: " << b.ommers.size();
249+
num_bodies++;
250+
num_txns += txn_count;
251+
}
252+
}
248253
}
249254
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
250255
const auto duration = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
251256
SILK_INFO << "How many bodies: " << num_bodies << " txs: " << num_txns << " duration: " << duration << " msec";
252257
}
253258

254259
void count_headers(const SnapSettings& settings, int repetitions) {
255-
SnapshotRepository snapshot_repo{settings};
260+
SnapshotRepository snapshot_repo{settings, bundle_factory()};
256261
snapshot_repo.reopen_folder();
257262
std::chrono::time_point start{std::chrono::steady_clock::now()};
258263
int count{0};
259264
for (int i{0}; i < repetitions; ++i) {
260-
const bool success = snapshot_repo.for_each_header([&count](const BlockHeader& h) -> bool {
261-
++count;
262-
if (h.number % 50'000 == 0) {
263-
SILK_INFO << "Header number: " << h.number << " hash: " << to_hex(h.hash());
265+
for (const SnapshotBundle& bundle : snapshot_repo.view_bundles()) {
266+
for (const BlockHeader& h : HeaderSnapshotReader{bundle.header_snapshot}) {
267+
++count;
268+
if (h.number % 50'000 == 0) {
269+
SILK_INFO << "Header number: " << h.number << " hash: " << to_hex(h.hash());
270+
}
264271
}
265-
return true;
266-
});
267-
ensure(success, "count_headers: for_each_header failed");
272+
}
268273
}
269274
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
270275
SILK_INFO << "How many headers: " << count << " duration: " << duration_as<std::chrono::milliseconds>(elapsed) << " msec";
@@ -405,16 +410,17 @@ void lookup_header_by_hash(const SnapSettings& settings) {
405410

406411
std::optional<SnapshotPath> matching_snapshot;
407412
std::optional<BlockHeader> matching_header;
408-
SnapshotRepository snapshot_repository{settings};
413+
SnapshotRepository snapshot_repository{settings, bundle_factory()};
409414
snapshot_repository.reopen_folder();
410-
snapshot_repository.view_header_segments([&](SnapshotRepository::SnapshotAndIndex snapshot) -> bool {
411-
const auto header = HeaderFindByHashQuery{snapshot.snapshot, snapshot.index}.exec(*hash);
415+
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles_reverse()) {
416+
auto snapshot_and_index = bundle.snapshot_and_index(SnapshotType::headers);
417+
const auto header = HeaderFindByHashQuery{snapshot_and_index}.exec(*hash);
412418
if (header) {
413419
matching_header = header;
414-
matching_snapshot = snapshot.snapshot.path();
420+
matching_snapshot = snapshot_and_index.snapshot.path();
421+
break;
415422
}
416-
return header.has_value();
417-
});
423+
}
418424
if (matching_snapshot) {
419425
SILK_INFO << "Lookup header hash: " << hash->to_hex() << " found in: " << matching_snapshot->filename();
420426
if (matching_header && settings.print) {
@@ -433,16 +439,16 @@ void lookup_header_by_number(const SnapSettings& settings) {
433439
SILK_INFO << "Lookup header number: " << block_number;
434440
std::chrono::time_point start{std::chrono::steady_clock::now()};
435441

436-
SnapshotRepository snapshot_repository{settings};
442+
SnapshotRepository snapshot_repository{settings, bundle_factory()};
437443
snapshot_repository.reopen_folder();
438-
const auto header_snapshot{snapshot_repository.find_header_segment(block_number)};
439-
if (header_snapshot) {
440-
const auto header = HeaderFindByBlockNumQuery{header_snapshot->snapshot, header_snapshot->index}.exec(block_number);
444+
const auto snapshot_and_index = snapshot_repository.find_segment(SnapshotType::headers, block_number);
445+
if (snapshot_and_index) {
446+
const auto header = HeaderFindByBlockNumQuery{*snapshot_and_index}.exec(block_number);
441447
ensure(header.has_value(),
442-
[&]() { return "lookup_header_by_number: " + std::to_string(block_number) + " NOT found in " + header_snapshot->snapshot.path().filename(); });
443-
SILK_INFO << "Lookup header number: " << block_number << " found in: " << header_snapshot->snapshot.path().filename();
448+
[&]() { return "lookup_header_by_number: " + std::to_string(block_number) + " NOT found in " + snapshot_and_index->snapshot.path().filename(); });
449+
SILK_INFO << "Lookup header number: " << block_number << " found in: " << snapshot_and_index->snapshot.path().filename();
444450
if (settings.print) {
445-
print_header(*header, header_snapshot->snapshot.path().filename());
451+
print_header(*header, snapshot_and_index->snapshot.path().filename());
446452
}
447453
} else {
448454
SILK_WARN << "Lookup header number: " << block_number << " NOT found";
@@ -479,7 +485,7 @@ void lookup_body_in_one(const SnapSettings& settings, BlockNum block_number, con
479485
Index idx_body_number{snapshot_path->index_file()};
480486
idx_body_number.reopen_index();
481487

482-
const auto body = BodyFindByBlockNumQuery{body_snapshot, idx_body_number}.exec(block_number);
488+
const auto body = BodyFindByBlockNumQuery{{body_snapshot, idx_body_number}}.exec(block_number);
483489
if (body) {
484490
SILK_INFO << "Lookup body number: " << block_number << " found in: " << body_snapshot.path().filename();
485491
if (settings.print) {
@@ -493,18 +499,18 @@ void lookup_body_in_one(const SnapSettings& settings, BlockNum block_number, con
493499
}
494500

495501
void lookup_body_in_all(const SnapSettings& settings, BlockNum block_number) {
496-
SnapshotRepository snapshot_repository{settings};
502+
SnapshotRepository snapshot_repository{settings, bundle_factory()};
497503
snapshot_repository.reopen_folder();
498504

499505
std::chrono::time_point start{std::chrono::steady_clock::now()};
500-
const auto body_snapshot{snapshot_repository.find_body_segment(block_number)};
501-
if (body_snapshot) {
502-
const auto body = BodyFindByBlockNumQuery{body_snapshot->snapshot, body_snapshot->index}.exec(block_number);
506+
const auto snapshot_and_index = snapshot_repository.find_segment(SnapshotType::bodies, block_number);
507+
if (snapshot_and_index) {
508+
const auto body = BodyFindByBlockNumQuery{*snapshot_and_index}.exec(block_number);
503509
ensure(body.has_value(),
504-
[&]() { return "lookup_body: " + std::to_string(block_number) + " NOT found in " + body_snapshot->snapshot.path().filename(); });
505-
SILK_INFO << "Lookup body number: " << block_number << " found in: " << body_snapshot->snapshot.path().filename();
510+
[&]() { return "lookup_body: " + std::to_string(block_number) + " NOT found in " + snapshot_and_index->snapshot.path().filename(); });
511+
SILK_INFO << "Lookup body number: " << block_number << " found in: " << snapshot_and_index->snapshot.path().filename();
506512
if (settings.print) {
507-
print_body(*body, body_snapshot->snapshot.path().filename());
513+
print_body(*body, snapshot_and_index->snapshot.path().filename());
508514
}
509515
} else {
510516
SILK_WARN << "Lookup body number: " << block_number << " NOT found";
@@ -585,7 +591,7 @@ void lookup_txn_by_hash_in_one(const SnapSettings& settings, const Hash& hash, c
585591
Index idx_txn_hash{snapshot_path->index_file()};
586592
idx_txn_hash.reopen_index();
587593

588-
const auto transaction = TransactionFindByHashQuery{tx_snapshot, idx_txn_hash}.exec(hash);
594+
const auto transaction = TransactionFindByHashQuery{{tx_snapshot, idx_txn_hash}}.exec(hash);
589595
if (transaction) {
590596
SILK_INFO << "Lookup txn hash: " << hash.to_hex() << " found in: " << tx_snapshot.path().filename();
591597
if (settings.print) {
@@ -600,21 +606,22 @@ void lookup_txn_by_hash_in_one(const SnapSettings& settings, const Hash& hash, c
600606
}
601607

602608
void lookup_txn_by_hash_in_all(const SnapSettings& settings, const Hash& hash) {
603-
SnapshotRepository snapshot_repository{settings};
609+
SnapshotRepository snapshot_repository{settings, bundle_factory()};
604610
snapshot_repository.reopen_folder();
605611

606612
std::optional<SnapshotPath> matching_snapshot;
607613
std::chrono::time_point start{std::chrono::steady_clock::now()};
608-
snapshot_repository.view_tx_segments([&](SnapshotRepository::SnapshotAndIndex snapshot) -> bool {
609-
const auto transaction = TransactionFindByHashQuery{snapshot.snapshot, snapshot.index}.exec(hash);
614+
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles_reverse()) {
615+
auto snapshot_and_index = bundle.snapshot_and_index(SnapshotType::transactions);
616+
const auto transaction = TransactionFindByHashQuery{snapshot_and_index}.exec(hash);
610617
if (transaction) {
611-
matching_snapshot = snapshot.snapshot.path();
618+
matching_snapshot = snapshot_and_index.snapshot.path();
612619
if (settings.print) {
613620
print_txn(*transaction, matching_snapshot->path().filename());
614621
}
622+
break;
615623
}
616-
return transaction.has_value();
617-
});
624+
}
618625
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
619626
SILK_INFO << "Lookup txn elapsed: " << duration_as<std::chrono::microseconds>(elapsed) << " usec";
620627
if (matching_snapshot) {
@@ -648,7 +655,7 @@ void lookup_txn_by_id_in_one(const SnapSettings& settings, uint64_t txn_id, cons
648655
Index idx_txn_hash{snapshot_path->index_file()};
649656
idx_txn_hash.reopen_index();
650657

651-
const auto transaction = TransactionFindByIdQuery{tx_snapshot, idx_txn_hash}.exec(txn_id);
658+
const auto transaction = TransactionFindByIdQuery{{tx_snapshot, idx_txn_hash}}.exec(txn_id);
652659
if (transaction) {
653660
SILK_INFO << "Lookup txn ID: " << txn_id << " found in: " << tx_snapshot.path().filename();
654661
if (settings.print) {
@@ -663,21 +670,22 @@ void lookup_txn_by_id_in_one(const SnapSettings& settings, uint64_t txn_id, cons
663670
}
664671

665672
void lookup_txn_by_id_in_all(const SnapSettings& settings, uint64_t txn_id) {
666-
SnapshotRepository snapshot_repository{settings};
673+
SnapshotRepository snapshot_repository{settings, bundle_factory()};
667674
snapshot_repository.reopen_folder();
668675

669676
std::optional<SnapshotPath> matching_snapshot;
670677
std::chrono::time_point start{std::chrono::steady_clock::now()};
671-
snapshot_repository.view_tx_segments([&](SnapshotRepository::SnapshotAndIndex snapshot) -> bool {
672-
const auto transaction = TransactionFindByIdQuery{snapshot.snapshot, snapshot.index}.exec(txn_id);
678+
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles_reverse()) {
679+
auto snapshot_and_index = bundle.snapshot_and_index(SnapshotType::transactions);
680+
const auto transaction = TransactionFindByIdQuery{snapshot_and_index}.exec(txn_id);
673681
if (transaction) {
674-
matching_snapshot = snapshot.snapshot.path();
682+
matching_snapshot = snapshot_and_index.snapshot.path();
675683
if (settings.print) {
676684
print_txn(*transaction, matching_snapshot->path().filename());
677685
}
686+
break;
678687
}
679-
return transaction.has_value();
680-
});
688+
}
681689
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
682690
SILK_INFO << "Lookup txn elapsed: " << duration_as<std::chrono::milliseconds>(elapsed) << " msec";
683691
if (matching_snapshot) {
@@ -708,7 +716,7 @@ void lookup_transaction(const SnapSettings& settings) {
708716

709717
void sync(const SnapSettings& settings) {
710718
std::chrono::time_point start{std::chrono::steady_clock::now()};
711-
SnapshotRepository snapshot_repository{settings};
719+
SnapshotRepository snapshot_repository{settings, bundle_factory()};
712720
db::SnapshotSync snapshot_sync{&snapshot_repository, kMainnetConfig};
713721
std::vector<std::string> snapshot_file_names;
714722
if (settings.snapshot_file_name) {

silkworm/capi/silkworm.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <silkworm/core/execution/execution.hpp>
3535
#include <silkworm/db/access_layer.hpp>
3636
#include <silkworm/db/buffer.hpp>
37+
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
3738
#include <silkworm/db/snapshots/body_index.hpp>
3839
#include <silkworm/db/snapshots/header_index.hpp>
3940
#include <silkworm/db/snapshots/index.hpp>
@@ -206,7 +207,8 @@ SILKWORM_EXPORT int silkworm_init(SilkwormHandle* handle, const struct SilkwormS
206207
log::init(log_settings);
207208
log::Info{"Silkworm build info", log_args_for_version()}; // NOLINT(*-unused-raii)
208209

209-
auto snapshot_repository = std::make_unique<snapshots::SnapshotRepository>();
210+
auto snapshot_bundle_factory = std::make_unique<db::SnapshotBundleFactoryImpl>();
211+
auto snapshot_repository = std::make_unique<snapshots::SnapshotRepository>(snapshots::SnapshotSettings{}, std::move(snapshot_bundle_factory));
210212
db::DataModel::set_snapshot_repository(snapshot_repository.get());
211213

212214
// NOLINTNEXTLINE(bugprone-unhandled-exception-at-new)

0 commit comments

Comments
 (0)