Skip to content

Commit 9118484

Browse files
authored
rpcdaemon: temporal KV server-side stubbed implementation (#2066)
1 parent 73f6a02 commit 9118484

File tree

4 files changed

+229
-16
lines changed

4 files changed

+229
-16
lines changed

silkworm/node/remote/kv/grpc/server/backend_kv_server.cpp

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,35 +52,35 @@ void BackEndKvServer::register_backend_request_calls(agrpc::GrpcContext* grpc_co
5252

5353
// Register one requested call repeatedly for each RPC: asio-grpc will take care of re-registration on any incoming call
5454
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestEtherbase,
55-
[&backend](auto&&... args) -> Task<void> {
55+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
5656
co_await EtherbaseCall{std::forward<decltype(args)>(args)...}(backend);
5757
});
5858
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestNetVersion,
59-
[&backend](auto&&... args) -> Task<void> {
59+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
6060
co_await NetVersionCall{std::forward<decltype(args)>(args)...}(backend);
6161
});
6262
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestNetPeerCount,
63-
[&backend](auto&&... args) -> Task<void> {
63+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
6464
co_await NetPeerCountCall{std::forward<decltype(args)>(args)...}(backend);
6565
});
6666
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestVersion,
67-
[&backend](auto&&... args) -> Task<void> {
67+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
6868
co_await BackEndVersionCall{std::forward<decltype(args)>(args)...}(backend);
6969
});
7070
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestProtocolVersion,
71-
[&backend](auto&&... args) -> Task<void> {
71+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
7272
co_await ProtocolVersionCall{std::forward<decltype(args)>(args)...}(backend);
7373
});
7474
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestClientVersion,
75-
[&backend](auto&&... args) -> Task<void> {
75+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
7676
co_await ClientVersionCall{std::forward<decltype(args)>(args)...}(backend);
7777
});
7878
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestSubscribe,
79-
[&backend](auto&&... args) -> Task<void> {
79+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
8080
co_await SubscribeCall{std::forward<decltype(args)>(args)...}(backend);
8181
});
8282
request_repeatedly(*grpc_context, service, &remote::ETHBACKEND::AsyncService::RequestNodeInfo,
83-
[&backend](auto&&... args) -> Task<void> {
83+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
8484
co_await NodeInfoCall{std::forward<decltype(args)>(args)...}(backend);
8585
});
8686
SILK_TRACE << "BackEndService::register_backend_request_calls END";
@@ -97,17 +97,41 @@ void BackEndKvServer::register_kv_request_calls(agrpc::GrpcContext* grpc_context
9797

9898
// Register one requested call repeatedly for each RPC: asio-grpc will take care of re-registration on any incoming call
9999
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestVersion,
100-
[&backend](auto&&... args) -> Task<void> {
100+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
101101
co_await KvVersionCall{std::forward<decltype(args)>(args)...}(backend);
102102
});
103103
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestTx,
104-
[&backend, grpc_context](auto&&... args) -> Task<void> {
104+
[&backend, grpc_context](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
105105
co_await TxCall{*grpc_context, std::forward<decltype(args)>(args)...}(backend);
106106
});
107107
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestStateChanges,
108-
[&backend](auto&&... args) -> Task<void> {
108+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
109109
co_await StateChangesCall{std::forward<decltype(args)>(args)...}(backend);
110110
});
111+
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestSnapshots,
112+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
113+
co_await SnapshotsCall{std::forward<decltype(args)>(args)...}(backend);
114+
});
115+
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainGet,
116+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
117+
co_await DomainGetCall{std::forward<decltype(args)>(args)...}(backend);
118+
});
119+
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryGet,
120+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
121+
co_await HistoryGetCall{std::forward<decltype(args)>(args)...}(backend);
122+
});
123+
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestIndexRange,
124+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
125+
co_await IndexRangeCall{std::forward<decltype(args)>(args)...}(backend);
126+
});
127+
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestHistoryRange,
128+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
129+
co_await HistoryRangeCall{std::forward<decltype(args)>(args)...}(backend);
130+
});
131+
request_repeatedly(*grpc_context, service, &remote::KV::AsyncService::RequestDomainRange,
132+
[&backend](auto&&... args) -> Task<void> { // NOLINT(cppcoreguidelines-avoid-capturing-lambda-coroutines)
133+
co_await DomainRangeCall{std::forward<decltype(args)>(args)...}(backend);
134+
});
111135
SILK_TRACE << "BackEndKvServer::register_kv_request_calls END";
112136
}
113137

silkworm/node/remote/kv/grpc/server/backend_kv_server_test.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include <silkworm/infra/grpc/common/conversion.hpp>
4141
#include <silkworm/infra/grpc/common/util.hpp>
4242
#include <silkworm/infra/test_util/log.hpp>
43+
#include <silkworm/interfaces/remote/kv.pb.h>
4344
#include <silkworm/interfaces/types/types.pb.h>
4445
#include <silkworm/node/backend/ethereum_backend.hpp>
4546
#include <silkworm/node/backend/state_change_collection.hpp>
@@ -152,6 +153,36 @@ class KvClient {
152153
return stub_->StateChanges(context, request);
153154
}
154155

156+
grpc::Status snapshots(const remote::SnapshotsRequest& request, remote::SnapshotsReply* response) {
157+
grpc::ClientContext context;
158+
return stub_->Snapshots(&context, request, response);
159+
}
160+
161+
grpc::Status history_get(const remote::HistoryGetReq& request, remote::HistoryGetReply* response) {
162+
grpc::ClientContext context;
163+
return stub_->HistoryGet(&context, request, response);
164+
}
165+
166+
grpc::Status domain_get(const remote::DomainGetReq& request, remote::DomainGetReply* response) {
167+
grpc::ClientContext context;
168+
return stub_->DomainGet(&context, request, response);
169+
}
170+
171+
grpc::Status index_range(const remote::IndexRangeReq& request, remote::IndexRangeReply* response) {
172+
grpc::ClientContext context;
173+
return stub_->IndexRange(&context, request, response);
174+
}
175+
176+
grpc::Status history_range(const remote::HistoryRangeReq& request, remote::Pairs* response) {
177+
grpc::ClientContext context;
178+
return stub_->HistoryRange(&context, request, response);
179+
}
180+
181+
grpc::Status domain_range(const remote::DomainRangeReq& request, remote::Pairs* response) {
182+
grpc::ClientContext context;
183+
return stub_->DomainRange(&context, request, response);
184+
}
185+
155186
private:
156187
remote::KV::StubInterface* stub_;
157188
};
@@ -829,6 +860,48 @@ TEST_CASE("BackEndKvServer E2E: KV", "[silkworm][node][rpc]") {
829860
const auto status1 = subscribe_reply_reader1->Finish();
830861
CHECK(status1.ok());
831862
}
863+
864+
SECTION("Snapshots: return snapshot files") {
865+
remote::SnapshotsRequest request;
866+
remote::SnapshotsReply response;
867+
const auto status = kv_client.snapshots(request, &response);
868+
CHECK(status.ok());
869+
}
870+
871+
SECTION("HistoryGet: return value in target history") {
872+
remote::HistoryGetReq request;
873+
remote::HistoryGetReply response;
874+
const auto status = kv_client.history_get(request, &response);
875+
CHECK(status.ok());
876+
}
877+
878+
SECTION("DomainGet: return value in target domain") {
879+
remote::DomainGetReq request;
880+
remote::DomainGetReply response;
881+
const auto status = kv_client.domain_get(request, &response);
882+
CHECK(status.ok());
883+
}
884+
885+
SECTION("IndexRange: return value in target index range") {
886+
remote::IndexRangeReq request;
887+
remote::IndexRangeReply response;
888+
const auto status = kv_client.index_range(request, &response);
889+
CHECK(status.ok());
890+
}
891+
892+
SECTION("HistoryRange: return value in target history range") {
893+
remote::HistoryRangeReq request;
894+
remote::Pairs response;
895+
const auto status = kv_client.history_range(request, &response);
896+
CHECK(status.ok());
897+
}
898+
899+
SECTION("DomainRange: return value in target domain range") {
900+
remote::DomainRangeReq request;
901+
remote::Pairs response;
902+
const auto status = kv_client.domain_range(request, &response);
903+
CHECK(status.ok());
904+
}
832905
}
833906

834907
TEST_CASE("BackEndKvServer E2E: mainnet chain with zero etherbase", "[silkworm][node][rpc]") {

silkworm/node/remote/kv/grpc/server/kv_calls.cpp

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,17 @@ Task<void> TxCall::operator()(const EthereumBackEnd& backend) {
8181

8282
grpc::Status status{grpc::Status::OK};
8383
try {
84+
// Assign a monotonically increasing unique ID to remote transaction
85+
const auto tx_id = ++next_tx_id_;
86+
8487
// Create a new read-only transaction.
8588
read_only_txn_ = db::ROTxnManaged{*chaindata_env};
86-
SILK_DEBUG << "TxCall peer: " << peer() << " started tx: " << read_only_txn_->id();
89+
SILK_DEBUG << "TxCall peer: " << peer() << " started tx: " << tx_id << " view: " << read_only_txn_->id();
8790

88-
// Send an unsolicited message containing the transaction ID.
91+
// Send an unsolicited message containing the transaction ID and view ID (i.e. MDBX txn ID)
8992
remote::Pair tx_id_pair;
90-
tx_id_pair.set_tx_id(read_only_txn_->id());
93+
tx_id_pair.set_tx_id(tx_id);
94+
tx_id_pair.set_view_id(read_only_txn_->id());
9195
if (!co_await agrpc::write(responder_, tx_id_pair)) {
9296
SILK_WARN << "Tx closed by peer: " << server_context_.peer() << " error: write failed";
9397
co_await agrpc::finish(responder_, grpc::Status::OK);
@@ -107,6 +111,7 @@ Task<void> TxCall::operator()(const EthereumBackEnd& backend) {
107111
remote::Cursor request;
108112
read_stream.initiate(agrpc::read, responder_, request);
109113

114+
// NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
110115
const auto read = [&]() -> Task<void> {
111116
try {
112117
while (co_await read_stream.next()) {
@@ -135,10 +140,12 @@ Task<void> TxCall::operator()(const EthereumBackEnd& backend) {
135140
status = grpc::Status{grpc::StatusCode::INTERNAL, exc.what()};
136141
}
137142
};
143+
// NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
138144
const auto write = [&]() -> Task<void> {
139145
while (co_await write_stream.next()) {
140146
}
141147
};
148+
// NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
142149
const auto max_idle_timer = [&]() -> Task<void> {
143150
while (true) {
144151
const auto [ec] = co_await max_idle_alarm.async_wait(as_tuple(use_awaitable));
@@ -150,6 +157,7 @@ Task<void> TxCall::operator()(const EthereumBackEnd& backend) {
150157
}
151158
}
152159
};
160+
// NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
153161
const auto max_ttl_timer = [&]() -> Task<void> {
154162
while (true) {
155163
const auto [ec] = co_await max_ttl_alarm.async_wait(as_tuple(use_awaitable));
@@ -404,7 +412,7 @@ bool TxCall::restore_cursors(std::vector<CursorPosition>& positions) {
404412
}
405413
} else {
406414
/* single-value table */
407-
const auto result = (key.length() == 0) ? cursor->to_first(/*throw_notfound=*/false) : cursor->lower_bound(key, /*throw_notfound=*/false);
415+
const auto result = (key.empty()) ? cursor->to_first(/*throw_notfound=*/false) : cursor->lower_bound(key, /*throw_notfound=*/false);
408416
SILK_DEBUG << "Tx restore cursor " << cursor_id << " for: " << bucket_name << " result: " << db::detail::dump_mdbx_result(result);
409417
if (!result) {
410418
return false;
@@ -447,7 +455,7 @@ void TxCall::handle_seek(const remote::Cursor* request, db::ROCursorDupSort& cur
447455
SILK_TRACE << "TxCall::handle_seek " << this << " START";
448456
mdbx::slice key{request->k()};
449457

450-
const auto result = (key.length() == 0) ? cursor.to_first(/*throw_notfound=*/false) : cursor.lower_bound(key, /*throw_notfound=*/false);
458+
const auto result = (key.empty()) ? cursor.to_first(/*throw_notfound=*/false) : cursor.lower_bound(key, /*throw_notfound=*/false);
451459
SILK_DEBUG << "Tx SEEK result: " << db::detail::dump_mdbx_result(result);
452460

453461
if (result) {
@@ -716,4 +724,54 @@ Task<void> StateChangesCall::operator()(const EthereumBackEnd& backend) {
716724
co_return;
717725
}
718726

727+
Task<void> SnapshotsCall::operator()(const EthereumBackEnd& /*backend*/) {
728+
SILK_TRACE << "SnapshotsCall START";
729+
remote::SnapshotsReply response;
730+
// TODO(canepat) implement properly
731+
co_await agrpc::finish(responder_, response, grpc::Status::OK);
732+
SILK_TRACE << "SnapshotsCall END #blocks_files: " << response.blocks_files_size() << " #history_files: " << response.history_files_size();
733+
}
734+
735+
Task<void> HistoryGetCall::operator()(const EthereumBackEnd& /*backend*/) {
736+
SILK_TRACE << "HistoryGetCall START";
737+
remote::HistoryGetReply response;
738+
// TODO(canepat) implement properly
739+
co_await agrpc::finish(responder_, response, grpc::Status::OK);
740+
SILK_TRACE << "HistoryGetCall END ok: " << response.ok() << " value: " << response.v();
741+
}
742+
743+
Task<void> DomainGetCall::operator()(const EthereumBackEnd& /*backend*/) {
744+
SILK_TRACE << "DomainGetCall START";
745+
remote::DomainGetReply response;
746+
// TODO(canepat) implement properly
747+
co_await agrpc::finish(responder_, response, grpc::Status::OK);
748+
SILK_TRACE << "DomainGetCall END ok: " << response.ok() << " value: " << response.v();
749+
}
750+
751+
Task<void> IndexRangeCall::operator()(const EthereumBackEnd& /*backend*/) {
752+
SILK_TRACE << "IndexRangeCall START";
753+
remote::IndexRangeReply response;
754+
// TODO(canepat) implement properly
755+
co_await agrpc::finish(responder_, response, grpc::Status::OK);
756+
SILK_TRACE << "IndexRangeCall END #timestamps: " << response.timestamps_size() << " next_page_token: " << response.next_page_token();
757+
}
758+
759+
Task<void> HistoryRangeCall::operator()(const EthereumBackEnd& /*backend*/) {
760+
SILK_TRACE << "HistoryRangeCall START";
761+
remote::Pairs response;
762+
// TODO(canepat) implement properly
763+
co_await agrpc::finish(responder_, response, grpc::Status::OK);
764+
SILK_TRACE << "HistoryRangeCall END #keys: " << response.keys_size() << " #values: " << response.values_size()
765+
<< " next_page_token: " << response.next_page_token();
766+
}
767+
768+
Task<void> DomainRangeCall::operator()(const EthereumBackEnd& /*backend*/) {
769+
SILK_TRACE << "DomainRangeCall START";
770+
remote::Pairs response;
771+
// TODO(canepat) implement properly
772+
co_await agrpc::finish(responder_, response, grpc::Status::OK);
773+
SILK_TRACE << "DomainRangeCall END #keys: " << response.keys_size() << " #values: " << response.values_size()
774+
<< " next_page_token: " << response.next_page_token();
775+
}
776+
719777
} // namespace silkworm::rpc

0 commit comments

Comments
 (0)