Skip to content

Commit 315ff53

Browse files
committed
refactor: Add ProxyContext EventLoop* member
This commit makes mechanical changes needed to simplify an upcoming commit which replaces EventLoop* with an EventLoopRef. This change also happens to be also useful on its own so clientInvoke can detect disconnections in a non-racy way (bitcoin-core#123 (comment)) by seeing if the client Connection pointer is null while holding the event loop mutex.
1 parent 9aaeec3 commit 315ff53

File tree

5 files changed

+23
-20
lines changed

5 files changed

+23
-20
lines changed

include/mp/proxy-io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
446446
Sub::destroy(*this);
447447

448448
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
449-
m_context.connection->m_loop.sync([&]() {
449+
m_context.loop->sync([&]() {
450450
// Release client capability by move-assigning to temporary.
451451
{
452452
typename Interface::Client(std::move(m_client));

include/mp/proxy-types.h

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ template <typename Client>
568568
void clientDestroy(Client& client)
569569
{
570570
if (client.m_context.connection) {
571-
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
571+
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
572572
} else {
573573
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
574574
}
@@ -577,7 +577,7 @@ void clientDestroy(Client& client)
577577
template <typename Server>
578578
void serverDestroy(Server& server)
579579
{
580-
server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name();
580+
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
581581
}
582582

583583
//! Entry point called by generated client code that looks like:
@@ -597,7 +597,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
597597
}
598598
if (!g_thread_context.waiter) {
599599
assert(g_thread_context.thread_name.empty());
600-
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
600+
g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name);
601601
// If next assert triggers, it means clientInvoke is being called from
602602
// the capnp event loop thread. This can happen when a ProxyServer
603603
// method implementation that runs synchronously on the event loop
@@ -608,26 +608,26 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
608608
// declaration so the server method runs in a dedicated thread.
609609
assert(!g_thread_context.loop_thread);
610610
g_thread_context.waiter = std::make_unique<Waiter>();
611-
proxy_client.m_context.connection->m_loop.logPlain()
611+
proxy_client.m_context.loop->logPlain()
612612
<< "{" << g_thread_context.thread_name
613613
<< "} IPC client first request from current thread, constructing waiter";
614614
}
615615
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
616616
std::exception_ptr exception;
617617
std::string kj_exception;
618618
bool done = false;
619-
proxy_client.m_context.connection->m_loop.sync([&]() {
619+
proxy_client.m_context.loop->sync([&]() {
620620
auto request = (proxy_client.m_client.*get_request)(nullptr);
621621
using Request = CapRequestTraits<decltype(request)>;
622622
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
623623
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
624-
proxy_client.m_context.connection->m_loop.logPlain()
624+
proxy_client.m_context.loop->logPlain()
625625
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
626626
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());
627627

628-
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
628+
proxy_client.m_context.loop->m_task_set->add(request.send().then(
629629
[&](::capnp::Response<typename Request::Results>&& response) {
630-
proxy_client.m_context.connection->m_loop.logPlain()
630+
proxy_client.m_context.loop->logPlain()
631631
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
632632
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
633633
try {
@@ -642,7 +642,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
642642
},
643643
[&](const ::kj::Exception& e) {
644644
kj_exception = kj::str("kj::Exception: ", e).cStr();
645-
proxy_client.m_context.connection->m_loop.logPlain()
645+
proxy_client.m_context.loop->logPlain()
646646
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
647647
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
648648
done = true;
@@ -653,7 +653,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
653653
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
654654
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
655655
if (exception) std::rethrow_exception(exception);
656-
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
656+
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
657657
}
658658

659659
//! Invoke callable `fn()` that may return void. If it does return void, replace
@@ -687,7 +687,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
687687
using Results = typename decltype(call_context.getResults())::Builds;
688688

689689
int req = ++server_reqs;
690-
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
690+
server.m_context.loop->log() << "IPC server recv request #" << req << " "
691691
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
692692

693693
try {
@@ -704,14 +704,14 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
704704
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
705705
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
706706
.then([&server, req](CallContext call_context) {
707-
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
707+
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
708708
<< " " << LogEscape(call_context.getResults().toString());
709709
});
710710
} catch (const std::exception& e) {
711-
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
711+
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
712712
throw;
713713
} catch (...) {
714-
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
714+
server.m_context.loop->log() << "IPC server unhandled exception";
715715
throw;
716716
}
717717
}

include/mp/proxy.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ class EventLoopRef
7272
struct ProxyContext
7373
{
7474
Connection* connection;
75+
EventLoop* loop;
7576
CleanupList cleanup_fns;
7677

77-
ProxyContext(Connection* connection) : connection(connection) {}
78+
ProxyContext(Connection* connection);
7879
};
7980

8081
//! Base class for generated ProxyClient classes that implement a C++ interface

include/mp/type-context.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
132132
return;
133133
}
134134
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
135-
server.m_context.connection->m_loop.sync([&] {
135+
server.m_context.loop->sync([&] {
136136
auto fulfiller_dispose = kj::mv(fulfiller);
137137
fulfiller_dispose->fulfill(kj::mv(call_context));
138138
});
139139
}))
140140
{
141-
server.m_context.connection->m_loop.sync([&]() {
141+
server.m_context.loop->sync([&]() {
142142
auto fulfiller_dispose = kj::mv(fulfiller);
143143
fulfiller_dispose->reject(kj::mv(*exception));
144144
});
@@ -156,11 +156,11 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
156156
// thread.
157157
KJ_IF_MAYBE (thread_server, perhaps) {
158158
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
159-
server.m_context.connection->m_loop.log()
159+
server.m_context.loop->log()
160160
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
161161
thread.m_thread_context.waiter->post(std::move(invoke));
162162
} else {
163-
server.m_context.connection->m_loop.log()
163+
server.m_context.loop->log()
164164
<< "IPC server error request #" << req << ", missing thread to execute request";
165165
throw std::runtime_error("invalid thread handle");
166166
}

src/mp/proxy.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ bool EventLoopRef::reset()
6565
return done;
6666
}
6767

68+
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{&connection->m_loop} {}
69+
6870
Connection::~Connection()
6971
{
7072
// Shut down RPC system first, since this will garbage collect Server

0 commit comments

Comments
 (0)