Skip to content

Commit c872f0e

Browse files
canepatweb-flow
andauthored
execution: direct client + gRPC client and server (#2004)
* execution: direct client + gRPC client and server * make fmt * add missing file * add missing files * missing changes to exec engine and forks * fix shadowing add missing file * unit tests for DirectService * make fmt * some unit tests for block conversions fix H2048_from_string improve signature of deserialize_hex_as_bytes * some unit tests for check functions fix fork_choice_from_response add test fixture facility * some unit tests for getters * more unit tests for getters refactoring to extract sample data * make fmt * some unit tests for inserters some refactoring * implement missing server-side getters fix block number and hash in server-side GetBody RPC unit tests for server-side checkers and getters some cleanup * make fmt * add missing bad block handling in server-side insertion add unit tests for server-side insertion * make fmt --------- Co-authored-by: GitHub <[email protected]>
1 parent 8f8be6b commit c872f0e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+5011
-33
lines changed

silkworm/infra/concurrency/co_spawn_sw.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,4 +369,12 @@ inline BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(
369369
std::forward<CompletionToken>(token));
370370
}
371371

372+
template <typename Executor, typename F>
373+
inline BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(
374+
boost::asio::use_awaitable,
375+
typename boost::asio::detail::awaitable_signature<typename boost::asio::result_of<F()>::type>::type = 0)
376+
co_spawn_and_await(const Executor& ex, F&& f) {
377+
return (co_spawn_sw)(ex, std::forward<F>(f), boost::asio::use_awaitable);
378+
}
379+
372380
} // namespace silkworm::concurrency

silkworm/infra/concurrency/task.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <silkworm/infra/concurrency/coroutine.hpp>
2020

2121
#include <boost/asio/awaitable.hpp>
22+
#include <boost/asio/io_context.hpp>
2223
#include <boost/asio/use_awaitable.hpp>
2324

2425
/// Use just \silkworm as namespace here to make these definitions available everywhere
@@ -32,4 +33,7 @@ using Task = boost::asio::awaitable<T>;
3233
//! Namespace for the current coroutine types
3334
namespace ThisTask = boost::asio::this_coro;
3435

36+
//! Executor for asynchronous tasks returned by any coroutine
37+
using TaskExecutor = boost::asio::io_context::executor_type;
38+
3539
} // namespace silkworm

silkworm/infra/grpc/common/conversion.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,9 @@ void span_from_H128(const ::types::H128& h128, ByteSpan<16> bytes) {
205205

206206
std::unique_ptr<::types::H2048> H2048_from_string(std::string_view orig) {
207207
auto lo_lo = H512_from_string(orig);
208-
auto lo_hi = H512_from_string(orig.substr(512));
209-
auto hi_lo = H512_from_string(orig.substr(1024));
210-
auto hi_hi = H512_from_string(orig.substr(1536));
208+
auto lo_hi = H512_from_string(orig.substr(64));
209+
auto hi_lo = H512_from_string(orig.substr(128));
210+
auto hi_hi = H512_from_string(orig.substr(192));
211211

212212
auto hi = std::make_unique<::types::H1024>();
213213
auto lo = std::make_unique<::types::H1024>();

silkworm/infra/grpc/server/server.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,13 @@ class Server {
130130
SILK_TRACE << "Server::shutdown " << this << " END";
131131
}
132132

133-
Task<void> async_run(const char* thread_name) {
133+
Task<void> async_run(const char* thread_name, std::optional<std::size_t> stack_size = {}) {
134134
auto run = [this] {
135135
this->build_and_start();
136136
this->join();
137137
};
138138
auto stop = [this] { this->shutdown(); };
139-
co_await concurrency::async_thread(std::move(run), std::move(stop), thread_name);
139+
co_await concurrency::async_thread(std::move(run), std::move(stop), thread_name, stack_size);
140140
}
141141

142142
//! Returns the number of server contexts.
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
Copyright 2024 The Silkworm Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#include "active_direct_service.hpp"
18+
19+
#include <silkworm/infra/concurrency/co_spawn_sw.hpp>
20+
21+
namespace silkworm::execution::api {
22+
23+
ActiveDirectService::ActiveDirectService(stagedsync::ExecutionEngine& exec_engine, boost::asio::io_context& context)
24+
: DirectService{exec_engine}, context_{context}, executor_{context_.get_executor()} {}
25+
26+
void ActiveDirectService::execution_loop() {
27+
exec_engine_.open();
28+
29+
boost::asio::executor_work_guard<decltype(executor_)> work{executor_};
30+
context_.run();
31+
32+
exec_engine_.close();
33+
}
34+
35+
bool ActiveDirectService::stop() {
36+
context_.stop();
37+
return ActiveComponent::stop();
38+
}
39+
40+
/** Chain Putters **/
41+
42+
// rpc InsertBlocks(InsertBlocksRequest) returns(InsertionResult);
43+
Task<InsertionResult> ActiveDirectService::insert_blocks(const Blocks& blocks) {
44+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto bb) {
45+
return self->DirectService::insert_blocks(bb);
46+
}(this, blocks));
47+
}
48+
49+
/** Chain Validation and ForkChoice **/
50+
51+
// rpc ValidateChain(ValidationRequest) returns(ValidationReceipt);
52+
Task<ValidationResult> ActiveDirectService::validate_chain(BlockNumAndHash number_and_hash) {
53+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_and_hash) {
54+
return self->DirectService::validate_chain(num_and_hash);
55+
}(this, number_and_hash));
56+
}
57+
58+
// rpc UpdateForkChoice(ForkChoice) returns(ForkChoiceReceipt);
59+
Task<ForkChoiceResult> ActiveDirectService::update_fork_choice(const ForkChoice& fork_choice) {
60+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto choice) {
61+
return self->DirectService::update_fork_choice(choice);
62+
}(this, fork_choice));
63+
}
64+
65+
/** Block Assembly **/
66+
67+
// rpc AssembleBlock(AssembleBlockRequest) returns(AssembleBlockResponse);
68+
Task<AssembleBlockResult> ActiveDirectService::assemble_block(const api::BlockUnderConstruction& block) {
69+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto b) {
70+
return self->DirectService::assemble_block(b);
71+
}(this, block));
72+
}
73+
74+
// rpc GetAssembledBlock(GetAssembledBlockRequest) returns(GetAssembledBlockResponse);
75+
Task<AssembledBlockResult> ActiveDirectService::get_assembled_block(PayloadId payload_id) {
76+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto id) {
77+
return self->DirectService::get_assembled_block(id);
78+
}(this, payload_id));
79+
}
80+
81+
/** Chain Getters **/
82+
83+
// rpc CurrentHeader(google.protobuf.Empty) returns(GetHeaderResponse);
84+
Task<std::optional<BlockHeader>> ActiveDirectService::current_header() {
85+
return concurrency::co_spawn_and_await(executor_, [](auto* self) {
86+
return self->DirectService::current_header();
87+
}(this));
88+
}
89+
90+
// rpc GetTD(GetSegmentRequest) returns(GetTDResponse);
91+
Task<std::optional<TotalDifficulty>> ActiveDirectService::get_td(BlockNumberOrHash number_or_hash) {
92+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) {
93+
return self->DirectService::get_td(num_or_hash);
94+
}(this, number_or_hash));
95+
}
96+
97+
// rpc GetHeader(GetSegmentRequest) returns(GetHeaderResponse);
98+
Task<std::optional<BlockHeader>> ActiveDirectService::get_header(BlockNumberOrHash number_or_hash) {
99+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) {
100+
return self->DirectService::get_header(num_or_hash);
101+
}(this, number_or_hash));
102+
}
103+
104+
// rpc GetBody(GetSegmentRequest) returns(GetBodyResponse);
105+
Task<std::optional<BlockBody>> ActiveDirectService::get_body(BlockNumberOrHash number_or_hash) {
106+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) {
107+
return self->DirectService::get_body(num_or_hash);
108+
}(this, number_or_hash));
109+
}
110+
111+
// rpc HasBlock(GetSegmentRequest) returns(HasBlockResponse);
112+
Task<bool> ActiveDirectService::has_block(BlockNumberOrHash number_or_hash) {
113+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) {
114+
return self->DirectService::has_block(num_or_hash);
115+
}(this, number_or_hash));
116+
}
117+
118+
/** Ranges **/
119+
120+
// rpc GetBodiesByRange(GetBodiesByRangeRequest) returns(GetBodiesBatchResponse);
121+
Task<BlockBodies> ActiveDirectService::get_bodies_by_range(BlockNumRange number_range) {
122+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_range) {
123+
return self->DirectService::get_bodies_by_range(num_range);
124+
}(this, number_range));
125+
}
126+
127+
// rpc GetBodiesByHashes(GetBodiesByHashesRequest) returns(GetBodiesBatchResponse);
128+
Task<BlockBodies> ActiveDirectService::get_bodies_by_hashes(const BlockHashes& hashes) {
129+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto hh) {
130+
return self->DirectService::get_bodies_by_hashes(hh);
131+
}(this, hashes));
132+
}
133+
134+
/** Chain Checkers **/
135+
136+
// rpc IsCanonicalHash(types.H256) returns(IsCanonicalResponse);
137+
Task<bool> ActiveDirectService::is_canonical_hash(Hash block_hash) {
138+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto h) {
139+
return self->DirectService::is_canonical_hash(h);
140+
}(this, block_hash));
141+
}
142+
143+
// rpc GetHeaderHashNumber(types.H256) returns(GetHeaderHashNumberResponse);
144+
Task<std::optional<BlockNum>> ActiveDirectService::get_header_hash_number(Hash block_hash) {
145+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto h) {
146+
return self->DirectService::get_header_hash_number(h);
147+
}(this, block_hash));
148+
}
149+
150+
// rpc GetForkChoice(google.protobuf.Empty) returns(ForkChoice);
151+
Task<ForkChoice> ActiveDirectService::get_fork_choice() {
152+
return concurrency::co_spawn_and_await(executor_, [](auto* self) {
153+
return self->DirectService::get_fork_choice();
154+
}(this));
155+
}
156+
157+
/** Misc **/
158+
159+
// rpc Ready(google.protobuf.Empty) returns(ReadyResponse);
160+
Task<bool> ActiveDirectService::ready() {
161+
return concurrency::co_spawn_and_await(executor_, [](auto* self) {
162+
return self->DirectService::ready();
163+
}(this));
164+
}
165+
166+
// rpc FrozenBlocks(google.protobuf.Empty) returns(FrozenBlocksResponse);
167+
Task<uint64_t> ActiveDirectService::frozen_blocks() {
168+
return concurrency::co_spawn_and_await(executor_, [](auto* self) {
169+
return self->DirectService::frozen_blocks();
170+
}(this));
171+
}
172+
173+
/** Additional non-RPC methods **/
174+
175+
Task<BlockHeaders> ActiveDirectService::get_last_headers(uint64_t n) {
176+
return concurrency::co_spawn_and_await(executor_, [](auto* self, auto how_many) {
177+
return self->DirectService::get_last_headers(how_many);
178+
}(this, n));
179+
}
180+
181+
Task<BlockNum> ActiveDirectService::block_progress() {
182+
return concurrency::co_spawn_and_await(executor_, [](auto* self) {
183+
return self->DirectService::block_progress();
184+
}(this));
185+
}
186+
187+
} // namespace silkworm::execution::api
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
Copyright 2024 The Silkworm Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <silkworm/infra/concurrency/task.hpp>
20+
21+
#include <silkworm/infra/concurrency/active_component.hpp>
22+
#include <silkworm/node/stagedsync/execution_engine.hpp>
23+
24+
#include "direct_service.hpp"
25+
26+
namespace silkworm::execution::api {
27+
28+
//! Active \code DirectService implementation running on one \code TaskExecutor.
29+
class ActiveDirectService : public DirectService, public ActiveComponent {
30+
public:
31+
ActiveDirectService(stagedsync::ExecutionEngine& exec_engine, boost::asio::io_context& context);
32+
~ActiveDirectService() override = default;
33+
34+
ActiveDirectService(const ActiveDirectService&) = delete;
35+
ActiveDirectService& operator=(const ActiveDirectService&) = delete;
36+
37+
ActiveDirectService(ActiveDirectService&&) = delete;
38+
ActiveDirectService& operator=(ActiveDirectService&&) = delete;
39+
40+
/** Chain Putters **/
41+
42+
// rpc InsertBlocks(InsertBlocksRequest) returns(InsertionResult);
43+
Task<InsertionResult> insert_blocks(const Blocks& blocks) override;
44+
45+
/** Chain Validation and ForkChoice **/
46+
47+
// rpc ValidateChain(ValidationRequest) returns(ValidationReceipt);
48+
Task<ValidationResult> validate_chain(BlockNumAndHash number_and_hash) override;
49+
50+
// rpc UpdateForkChoice(ForkChoice) returns(ForkChoiceReceipt);
51+
Task<ForkChoiceResult> update_fork_choice(const ForkChoice& fork_choice) override;
52+
53+
/** Block Assembly **/
54+
55+
// rpc AssembleBlock(AssembleBlockRequest) returns(AssembleBlockResponse);
56+
Task<AssembleBlockResult> assemble_block(const BlockUnderConstruction&) override;
57+
58+
// rpc GetAssembledBlock(GetAssembledBlockRequest) returns(GetAssembledBlockResponse);
59+
Task<AssembledBlockResult> get_assembled_block(PayloadId id) override;
60+
61+
/** Chain Getters **/
62+
63+
// rpc CurrentHeader(google.protobuf.Empty) returns(GetHeaderResponse);
64+
Task<std::optional<BlockHeader>> current_header() override;
65+
66+
// rpc GetTD(GetSegmentRequest) returns(GetTDResponse);
67+
Task<std::optional<TotalDifficulty>> get_td(BlockNumberOrHash number_or_hash) override;
68+
69+
// rpc GetHeader(GetSegmentRequest) returns(GetHeaderResponse);
70+
Task<std::optional<BlockHeader>> get_header(BlockNumberOrHash number_or_hash) override;
71+
72+
// rpc GetBody(GetSegmentRequest) returns(GetBodyResponse);
73+
Task<std::optional<BlockBody>> get_body(BlockNumberOrHash number_or_hash) override;
74+
75+
// rpc HasBlock(GetSegmentRequest) returns(HasBlockResponse);
76+
Task<bool> has_block(BlockNumberOrHash number_or_hash) override;
77+
78+
/** Ranges **/
79+
80+
// rpc GetBodiesByRange(GetBodiesByRangeRequest) returns(GetBodiesBatchResponse);
81+
Task<BlockBodies> get_bodies_by_range(BlockNumRange range) override;
82+
83+
// rpc GetBodiesByHashes(GetBodiesByHashesRequest) returns(GetBodiesBatchResponse);
84+
Task<BlockBodies> get_bodies_by_hashes(const BlockHashes& hashes) override;
85+
86+
/** Chain Checkers **/
87+
88+
// rpc IsCanonicalHash(types.H256) returns(IsCanonicalResponse);
89+
Task<bool> is_canonical_hash(Hash block_hash) override;
90+
91+
// rpc GetHeaderHashNumber(types.H256) returns(GetHeaderHashNumberResponse);
92+
Task<std::optional<BlockNum>> get_header_hash_number(Hash block_hash) override;
93+
94+
// rpc GetForkChoice(google.protobuf.Empty) returns(ForkChoice);
95+
Task<ForkChoice> get_fork_choice() override;
96+
97+
/** Misc **/
98+
99+
// rpc Ready(google.protobuf.Empty) returns(ReadyResponse);
100+
Task<bool> ready() override;
101+
102+
// rpc FrozenBlocks(google.protobuf.Empty) returns(FrozenBlocksResponse);
103+
Task<uint64_t> frozen_blocks() override;
104+
105+
/** Additional non-RPC methods **/
106+
107+
Task<BlockHeaders> get_last_headers(uint64_t n) override;
108+
109+
Task<BlockNum> block_progress() override;
110+
111+
protected:
112+
void execution_loop() override;
113+
bool stop() override;
114+
115+
private:
116+
boost::asio::io_context& context_;
117+
TaskExecutor executor_;
118+
};
119+
120+
} // namespace silkworm::execution::api
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2024 The Silkworm Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <memory>
20+
21+
#include "service.hpp"
22+
23+
namespace silkworm::execution::api {
24+
25+
struct Client {
26+
virtual ~Client() = default;
27+
28+
virtual std::shared_ptr<Service> service() = 0;
29+
};
30+
31+
} // namespace silkworm::execution::api

0 commit comments

Comments
 (0)