Skip to content

node, sync: use new Execution API client/server #2013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ int main(int argc, char* argv[]) {
// Execution: the execution layer engine
// NOLINTNEXTLINE(cppcoreguidelines-slicing)
silkworm::node::Node execution_node{settings.node_settings, sentry_client, chaindata_db};
execution::LocalClient& execution_client{execution_node.execution_local_client()};
execution::api::DirectClient& execution_client{execution_node.execution_direct_client()};

// Set up the execution node (e.g. load pre-verified hashes, download+index snapshots...)
execution_node.setup();
Expand Down
4 changes: 3 additions & 1 deletion silkworm/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
include("${SILKWORM_MAIN_DIR}/cmake/common/targets.cmake")

add_subdirectory(stagedsync/stages)
add_subdirectory(test_util)

find_package(absl REQUIRED strings)
find_package(asio-grpc REQUIRED)
find_package(Boost REQUIRED headers)
find_package(gRPC REQUIRED)
find_package(GTest REQUIRED)
find_package(magic_enum REQUIRED)
find_package(Microsoft.GSL REQUIRED)
find_package(Protobuf REQUIRED)
Expand Down Expand Up @@ -56,4 +58,4 @@ silkworm_library(
PRIVATE ${SILKWORM_NODE_PRIVATE_LIBS}
)

target_link_libraries(silkworm_node_test PRIVATE silkworm_db_test_util)
target_link_libraries(silkworm_node_test PRIVATE silkworm_node_test_util GTest::gmock)
8 changes: 4 additions & 4 deletions silkworm/node/execution/api/active_direct_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ bool ActiveDirectService::stop() {

// rpc InsertBlocks(InsertBlocksRequest) returns(InsertionResult);
Task<InsertionResult> ActiveDirectService::insert_blocks(const Blocks& blocks) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto bb) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& bb) {
return self->DirectService::insert_blocks(bb);
}(this, blocks));
}
Expand All @@ -57,7 +57,7 @@ Task<ValidationResult> ActiveDirectService::validate_chain(BlockNumAndHash numbe

// rpc UpdateForkChoice(ForkChoice) returns(ForkChoiceReceipt);
Task<ForkChoiceResult> ActiveDirectService::update_fork_choice(const ForkChoice& fork_choice) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto choice) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& choice) {
return self->DirectService::update_fork_choice(choice);
}(this, fork_choice));
}
Expand All @@ -66,7 +66,7 @@ Task<ForkChoiceResult> ActiveDirectService::update_fork_choice(const ForkChoice&

// rpc AssembleBlock(AssembleBlockRequest) returns(AssembleBlockResponse);
Task<AssembleBlockResult> ActiveDirectService::assemble_block(const api::BlockUnderConstruction& block) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto b) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& b) {
return self->DirectService::assemble_block(b);
}(this, block));
}
Expand Down Expand Up @@ -126,7 +126,7 @@ Task<BlockBodies> ActiveDirectService::get_bodies_by_range(BlockNumRange number_

// rpc GetBodiesByHashes(GetBodiesByHashesRequest) returns(GetBodiesBatchResponse);
Task<BlockBodies> ActiveDirectService::get_bodies_by_hashes(const BlockHashes& hashes) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto hh) {
return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& hh) {
return self->DirectService::get_bodies_by_hashes(hh);
}(this, hashes));
}
Expand Down
264 changes: 264 additions & 0 deletions silkworm/node/execution/api/active_direct_service_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
Copyright 2024 The Silkworm Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "active_direct_service.hpp"

#include <stdexcept>
#include <thread>
#include <utility>

#include <boost/asio/io_context.hpp>
#include <catch2/catch.hpp>
#include <gmock/gmock.h>

#include <silkworm/db/test_util/temp_chain_data.hpp>
#include <silkworm/infra/test_util/log.hpp>
#include <silkworm/infra/test_util/task_runner.hpp>
#include <silkworm/node/stagedsync/execution_engine.hpp>
#include <silkworm/node/test_util/mock_execution_engine.hpp>
#include <silkworm/node/test_util/temp_chain_data_node_settings.hpp>

namespace silkworm::execution::api {

using testing::_;
using testing::InvokeWithoutArgs;

using silkworm::db::test_util::TempChainData;
using silkworm::node::test_util::make_node_settings_from_temp_chain_data;
using silkworm::test_util::SetLogVerbosityGuard;
using silkworm::test_util::TaskRunner;

class ActiveDirectService_ForTest : public ActiveDirectService {
public:
using ActiveDirectService::ActiveDirectService, ActiveComponent::execution_loop, ActiveComponent::stop;
};

struct ActiveDirectServiceTest : public TaskRunner {
explicit ActiveDirectServiceTest()
: log_guard{log::Level::kNone},
settings{make_node_settings_from_temp_chain_data(tmp_chaindata)},
dba{tmp_chaindata.env()} {
tmp_chaindata.add_genesis_data();
tmp_chaindata.commit_txn();
mock_execution_engine = std::make_unique<MockExecutionEngine>(context(), settings, dba);
direct_service = std::make_unique<ActiveDirectService_ForTest>(*mock_execution_engine, execution_context);
execution_context_thread = std::thread{[this]() {
direct_service->execution_loop();
}};
}
~ActiveDirectServiceTest() override {
direct_service->stop();
if (execution_context_thread.joinable()) {
execution_context_thread.join();
}
}

SetLogVerbosityGuard log_guard;
TempChainData tmp_chaindata;
NodeSettings settings;
db::RWAccess dba;
std::unique_ptr<MockExecutionEngine> mock_execution_engine;
boost::asio::io_context execution_context;
std::unique_ptr<ActiveDirectService_ForTest> direct_service;
std::thread execution_context_thread;
};

TEST_CASE_METHOD(ActiveDirectServiceTest, "ActiveDirectServiceTest::insert_blocks", "[node][execution][api]") {
const std::vector<Blocks> test_vectors = {
Blocks{},
Blocks{std::make_shared<Block>()},
};
for (const auto& blocks : test_vectors) {
SECTION("blocks: " + std::to_string(blocks.size())) {
EXPECT_CALL(*mock_execution_engine, insert_blocks(blocks))
.WillOnce(InvokeWithoutArgs([]() -> void {
return;
}));
auto future = spawn_future(direct_service->insert_blocks(blocks));
context().run();
CHECK(future.get().status == api::ExecutionStatus::kSuccess);
}
}
}

TEST_CASE_METHOD(ActiveDirectServiceTest, "ActiveDirectServiceTest::verify_chain", "[node][execution][api]") {
const Hash latest_valid_hash{0x000000000000000000000000000000000000000000000000000000000000000A_bytes32};
const Hash new_hash{0x000000000000000000000000000000000000000000000000000000000000000B_bytes32};
const BlockNumAndHash latest_valid_head{
.number = 1,
.hash = latest_valid_hash,
};
const BlockNumAndHash new_head{
.number = 2,
.hash = new_hash,
};
const std::vector<std::pair<stagedsync::VerificationResult, execution::api::ValidationResult>> test_vectors{
{stagedsync::ValidChain{.current_head = new_head}, api::ValidChain{.current_head = new_head}},
{stagedsync::InvalidChain{.unwind_point = latest_valid_head}, api::InvalidChain{.unwind_point = latest_valid_head}},
{stagedsync::ValidationError{.latest_valid_head = latest_valid_head}, api::ValidationError{.latest_valid_head = latest_valid_head}},
};
for (const auto& [stagedsync_result, api_result] : test_vectors) {
SECTION("result: " + std::to_string(stagedsync_result.index())) {
EXPECT_CALL(*mock_execution_engine, verify_chain(new_head.hash))
.WillOnce(InvokeWithoutArgs([&, result = stagedsync_result]() -> stagedsync::VerificationResultFuture {
stagedsync::VerificationResultPromise promise{context().get_executor()};
promise.set_value(result);
return promise.get_future();
}));
auto future = spawn_future(direct_service->validate_chain(new_head));
context().run();
const auto result{future.get()};
if (std::holds_alternative<stagedsync::ValidChain>(stagedsync_result)) {
CHECK(std::holds_alternative<api::ValidChain>(result));
const auto stagedsync_valid_chain{std::get<stagedsync::ValidChain>(stagedsync_result)};
const auto api_valid_chain{std::get<api::ValidChain>(result)};
CHECK(stagedsync_valid_chain.current_head == api_valid_chain.current_head);
} else if (std::holds_alternative<stagedsync::InvalidChain>(stagedsync_result)) {
CHECK(std::holds_alternative<api::InvalidChain>(result));
const auto stagedsync_invalid_chain{std::get<stagedsync::InvalidChain>(stagedsync_result)};
const auto api_invalid_chain{std::get<api::InvalidChain>(result)};
CHECK(stagedsync_invalid_chain.unwind_point == api_invalid_chain.unwind_point);
CHECK(stagedsync_invalid_chain.bad_headers == api_invalid_chain.bad_headers);
CHECK(stagedsync_invalid_chain.bad_block == api_invalid_chain.bad_block);
} else if (std::holds_alternative<stagedsync::ValidationError>(stagedsync_result)) {
CHECK(std::holds_alternative<api::ValidationError>(result));
const auto stagedsync_error{std::get<stagedsync::ValidationError>(stagedsync_result)};
const auto api_error{std::get<api::ValidationError>(result)};
CHECK(stagedsync_error.latest_valid_head == api_error.latest_valid_head);
} else {
REQUIRE(false);
}
}
}
}

TEST_CASE_METHOD(ActiveDirectServiceTest, "ActiveDirectServiceTest::update_fork_choice", "[node][execution][api]") {
const Hash head_block_hash{0x000000000000000000000000000000000000000000000000000000000000000A_bytes32};
const Hash finalized_block_hash{0x0000000000000000000000000000000000000000000000000000000000000002_bytes32};
const Hash safe_block_hash{0x0000000000000000000000000000000000000000000000000000000000000001_bytes32};
const ForkChoice fork_choice{
.head_block_hash = head_block_hash,
.timeout = 0,
.finalized_block_hash = finalized_block_hash,
.safe_block_hash = safe_block_hash,
};
const std::vector<std::pair<bool, ForkChoiceResult>> test_vectors{
{true, ForkChoiceResult{.status = api::ExecutionStatus::kSuccess, .latest_valid_head = head_block_hash}},
{false, ForkChoiceResult{.status = api::ExecutionStatus::kInvalidForkchoice, .latest_valid_head = finalized_block_hash}},
};
for (const auto& [updated, expected_choice_result] : test_vectors) {
SECTION("updated: " + std::to_string(updated)) {
EXPECT_CALL(*mock_execution_engine, notify_fork_choice_update3(head_block_hash, finalized_block_hash, safe_block_hash))
.WillOnce(InvokeWithoutArgs([result = updated]() -> bool {
return result;
}));
EXPECT_CALL(*mock_execution_engine, last_fork_choice())
.WillOnce(InvokeWithoutArgs([=, result = updated]() -> BlockId {
return result ? BlockId{10, head_block_hash} : BlockId{2, finalized_block_hash};
}));
auto future = spawn_future(direct_service->update_fork_choice(fork_choice));
context().run();
const auto fork_choice_result{future.get()};
CHECK(fork_choice_result.status == expected_choice_result.status);
CHECK(fork_choice_result.latest_valid_head == expected_choice_result.latest_valid_head);
}
}
}

TEST_CASE_METHOD(ActiveDirectServiceTest, "ActiveDirectServiceTest::get_block_number", "[node][execution][api]") {
const Hash block_hash{0x000000000000000000000000000000000000000000000000000000000000000A_bytes32};
SECTION("non-existent") {
EXPECT_CALL(*mock_execution_engine, get_block_number(block_hash))
.WillOnce(InvokeWithoutArgs([=]() -> std::optional<BlockNum> {
return {};
}));
auto future = spawn_future(direct_service->get_header_hash_number(block_hash));
context().run();
CHECK(future.get() == std::nullopt);
}
SECTION("existent") {
const BlockNum block_number{2};
EXPECT_CALL(*mock_execution_engine, get_block_number(block_hash))
.WillOnce(InvokeWithoutArgs([=]() -> std::optional<BlockNum> {
return block_number;
}));
auto future = spawn_future(direct_service->get_header_hash_number(block_hash));
context().run();
CHECK(future.get() == block_number);
}
}

TEST_CASE_METHOD(ActiveDirectServiceTest, "ActiveDirectServiceTest::get_fork_choice", "[node][execution][api]") {
const Hash head_block_hash{0x000000000000000000000000000000000000000000000000000000000000000A_bytes32};
const Hash finalized_block_hash{0x0000000000000000000000000000000000000000000000000000000000000002_bytes32};
const Hash safe_block_hash{0x0000000000000000000000000000000000000000000000000000000000000001_bytes32};
const ForkChoice expected_fork_choice{
.head_block_hash = head_block_hash,
.timeout = 0,
.finalized_block_hash = finalized_block_hash,
.safe_block_hash = safe_block_hash,
};
EXPECT_CALL(*mock_execution_engine, last_fork_choice())
.WillOnce(InvokeWithoutArgs([=]() -> BlockId {
return {10, head_block_hash};
}));
EXPECT_CALL(*mock_execution_engine, last_finalized_block())
.WillOnce(InvokeWithoutArgs([=]() -> BlockId {
return {2, finalized_block_hash};
}));
EXPECT_CALL(*mock_execution_engine, last_safe_block())
.WillOnce(InvokeWithoutArgs([=]() -> BlockId {
return {1, safe_block_hash};
}));
auto future = spawn_future(direct_service->get_fork_choice());
context().run();
const auto last_choice{future.get()};
CHECK(last_choice.head_block_hash == expected_fork_choice.head_block_hash);
CHECK(last_choice.timeout == expected_fork_choice.timeout);
CHECK(last_choice.finalized_block_hash == expected_fork_choice.finalized_block_hash);
CHECK(last_choice.safe_block_hash == expected_fork_choice.safe_block_hash);
}

TEST_CASE_METHOD(ActiveDirectServiceTest, "ActiveDirectServiceTest::get_last_headers", "[node][execution][api]") {
const std::vector<std::pair<uint64_t, BlockHeaders>> test_vectors = {
{0, {}},
{1, {BlockHeader{}}},
};
for (const auto& [how_many, last_headers] : test_vectors) {
SECTION("how_many: " + std::to_string(how_many)) {
EXPECT_CALL(*mock_execution_engine, get_last_headers(how_many))
.WillOnce(InvokeWithoutArgs([&, headers = last_headers]() -> BlockHeaders {
return headers;
}));
auto future = spawn_future(direct_service->get_last_headers(how_many));
context().run();
CHECK(future.get() == last_headers);
}
}
}

TEST_CASE_METHOD(ActiveDirectServiceTest, "ActiveDirectServiceTest::block_progress", "[node][execution][api]") {
const BlockNum progress{123'456'789};
EXPECT_CALL(*mock_execution_engine, block_progress())
.WillOnce(InvokeWithoutArgs([=]() -> BlockNum {
return progress;
}));
auto future = spawn_future(direct_service->block_progress());
context().run();
CHECK(future.get() == progress);
}

} // namespace silkworm::execution::api
40 changes: 1 addition & 39 deletions silkworm/node/execution/api/direct_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <silkworm/infra/test_util/log.hpp>
#include <silkworm/infra/test_util/task_runner.hpp>
#include <silkworm/node/stagedsync/execution_engine.hpp>
#include <silkworm/node/test_util/mock_execution_engine.hpp>
#include <silkworm/node/test_util/temp_chain_data_node_settings.hpp>

namespace silkworm::execution::api {
Expand All @@ -38,45 +39,6 @@ using silkworm::node::test_util::make_node_settings_from_temp_chain_data;
using silkworm::test_util::SetLogVerbosityGuard;
using silkworm::test_util::TaskRunner;

class MockExecutionEngine : public stagedsync::ExecutionEngine {
public:
MockExecutionEngine(boost::asio::io_context& ioc, NodeSettings& ns, db::RWAccess dba)
: ExecutionEngine(ioc, ns, std::move(dba)) {}
~MockExecutionEngine() override = default;

MOCK_METHOD((void), open, ());
MOCK_METHOD((void), close, ());

MOCK_METHOD((void), insert_blocks, (const std::vector<std::shared_ptr<Block>>&), (override));
MOCK_METHOD((stagedsync::VerificationResultFuture), verify_chain, (Hash), (override));
MOCK_METHOD((bool), notify_fork_choice_update1, (Hash));
MOCK_METHOD((bool), notify_fork_choice_update2, (Hash, Hash));
MOCK_METHOD((bool), notify_fork_choice_update3, (Hash, Hash, Hash));
bool notify_fork_choice_update(Hash head_block_hash,
std::optional<Hash> finalized_block_hash,
std::optional<Hash> safe_block_hash) override {
if (finalized_block_hash && safe_block_hash) {
return notify_fork_choice_update3(head_block_hash, *finalized_block_hash, *safe_block_hash);
} else {
if (finalized_block_hash) {
return notify_fork_choice_update2(head_block_hash, *finalized_block_hash);
} else {
return notify_fork_choice_update1(head_block_hash);
}
}
return false;
}

MOCK_METHOD((BlockId), last_fork_choice, (), (const, override));
MOCK_METHOD((BlockId), last_finalized_block, (), (const, override));
MOCK_METHOD((BlockId), last_safe_block, (), (const, override));

MOCK_METHOD((std::optional<BlockNum>), get_block_number, (Hash), (const, override));

MOCK_METHOD((BlockHeaders), get_last_headers, (uint64_t), (const, override));
MOCK_METHOD((BlockNum), block_progress, (), (const, override));
};

struct DirectServiceTest : public TaskRunner {
explicit DirectServiceTest()
: log_guard{log::Level::kNone},
Expand Down
1 change: 1 addition & 0 deletions silkworm/node/execution/api/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

namespace silkworm::execution::api {

//! Common Execution API definition for both in-process and out-of-process client/server
struct Service {
virtual ~Service() = default;

Expand Down
Loading
Loading