Skip to content

Commit dee0711

Browse files
committed
Merge bitcoin-core#8: Add Connnect/Serve/Spawn/Wait functions
f324c66 Add Connnect/Serve/Spawn/Wait functions (Russell Yanofsky) Pull request description: Add forking and connecting functions to pull more IPC code out of bitcoin/bitcoin#10102 into libmultiprocess Top commit has no ACKs. Tree-SHA512: dcb40c4de2afe7d6ebb5b97376abe03f3f6d31d150276961ef1237f0438ad7b5fd890890ae7bf06254abe6c969a92dd44ad764a0a181f55cdfd409b702db5e22
2 parents 8125688 + f324c66 commit dee0711

File tree

4 files changed

+136
-2
lines changed

4 files changed

+136
-2
lines changed

include/mp/proxy-io.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,41 @@ struct ThreadContext
475475
bool loop_thread = false;
476476
};
477477

478+
//! Given stream file descriptor, make a new ProxyClient object to send requests
479+
//! over the stream. Also create a new Connection object embedded in the
480+
//! client that is freed when the client is closed.
481+
template <typename InitInterface>
482+
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd, bool add_client)
483+
{
484+
typename InitInterface::Client init_client(nullptr);
485+
std::unique_ptr<Connection> connection;
486+
loop.sync([&] {
487+
auto stream =
488+
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
489+
connection = std::make_unique<Connection>(loop, kj::mv(stream), add_client);
490+
init_client = connection->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
491+
Connection* connection_ptr = connection.get();
492+
connection->onDisconnect([&loop, connection_ptr] {
493+
loop.log() << "IPC client: unexpected network disconnect.";
494+
delete connection_ptr;
495+
});
496+
});
497+
return std::make_unique<ProxyClient<InitInterface>>(
498+
kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
499+
}
500+
501+
//! Given stream and a callback to construct a new ProxyServer object that
502+
//! handles requests from the stream, create a new Connection callback, pass it
503+
//! to the callback, use the returned ProxyServer to handle requests, and delete
504+
//! the proxyserver if the connection is disconnected.
505+
//! This should be called from the event loop thread.
506+
void ServeStream(EventLoop& loop,
507+
kj::Own<kj::AsyncIoStream>&& stream,
508+
std::function<capnp::Capability::Client(Connection&)> make_server);
509+
510+
//! Same as above but accept file descriptor rather than stream object.
511+
void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server);
512+
478513
extern thread_local ThreadContext g_thread_context;
479514

480515
} // namespace mp

include/mp/util.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <tuple>
1818
#include <type_traits>
1919
#include <utility>
20+
#include <vector>
2021

2122
namespace mp {
2223

@@ -360,6 +361,23 @@ std::string ThreadName(const char* exe_name);
360361
//! errors in python unit tests.
361362
std::string LogEscape(const kj::StringTree& string);
362363

364+
//! Callback type used by SpawnProcess below.
365+
using FdToArgsFn = std::function<std::vector<std::string>(int fd)>;
366+
367+
//! Spawn a new process that communicates with the current process over a socket
368+
//! pair. Returns pid through an output argument, and file descriptor for the
369+
//! local side of the socket. Invokes fd_to_args callback with the remote file
370+
//! descriptor number which returns the command line arguments that should be
371+
//! used to execute the process, and which should have the remote file
372+
//! descriptor embedded in whatever format the child process expects.
373+
int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args);
374+
375+
//! Call execvp with vector args.
376+
void ExecProcess(const std::vector<std::string>& args);
377+
378+
//! Wait for a process to exit and return its exit code.
379+
int WaitProcess(int pid);
380+
363381
inline char* CharCast(char* c) { return c; }
364382
inline char* CharCast(unsigned char* c) { return (char*)c; }
365383
inline const char* CharCast(const char* c) { return c; }

src/mp/proxy.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,28 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
311311

312312
std::atomic<int> server_reqs{0};
313313

314-
315314
std::string LongThreadName(const char* exe_name)
316315
{
317316
return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
318317
}
319318

319+
void ServeStream(EventLoop& loop,
320+
kj::Own<kj::AsyncIoStream>&& stream,
321+
std::function<capnp::Capability::Client(Connection&)> make_server)
322+
{
323+
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), make_server);
324+
auto it = loop.m_incoming_connections.begin();
325+
it->onDisconnect([&loop, it] {
326+
loop.log() << "IPC server: socket disconnected.";
327+
loop.m_incoming_connections.erase(it);
328+
});
329+
}
330+
331+
void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server)
332+
{
333+
ServeStream(loop,
334+
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
335+
std::move(make_server));
336+
}
337+
320338
} // namespace mp

src/mp/util.cpp

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,32 @@
55
#include <mp/util.h>
66

77
#include <kj/array.h>
8-
#include <mp/proxy.h>
98
#include <pthread.h>
109
#include <sstream>
1110
#include <stdio.h>
11+
#include <sys/resource.h>
12+
#include <sys/socket.h>
13+
#include <sys/types.h>
14+
#include <sys/un.h>
15+
#include <sys/wait.h>
1216
#include <syscall.h>
1317
#include <unistd.h>
1418

1519
namespace mp {
20+
namespace {
21+
22+
//! Return highest possible file descriptor.
23+
size_t MaxFd()
24+
{
25+
struct rlimit nofile;
26+
if (getrlimit(RLIMIT_NOFILE, &nofile) == 0) {
27+
return nofile.rlim_cur - 1;
28+
} else {
29+
return 1023;
30+
}
31+
}
32+
33+
} // namespace
1634

1735
std::string ThreadName(const char* exe_name)
1836
{
@@ -54,4 +72,49 @@ std::string LogEscape(const kj::StringTree& string)
5472
return result;
5573
}
5674

75+
int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args)
76+
{
77+
int fds[2];
78+
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {
79+
throw std::system_error(errno, std::system_category());
80+
}
81+
82+
pid = fork();
83+
if (close(fds[pid ? 0 : 1]) != 0) {
84+
throw std::system_error(errno, std::system_category());
85+
}
86+
if (!pid) {
87+
int maxFd = MaxFd();
88+
for (int fd = 3; fd < maxFd; ++fd) {
89+
if (fd != fds[0]) {
90+
close(fd);
91+
}
92+
}
93+
ExecProcess(fd_to_args(fds[0]));
94+
}
95+
return fds[1];
96+
}
97+
98+
void ExecProcess(const std::vector<std::string>& args)
99+
{
100+
std::vector<char*> argv;
101+
for (const auto& arg : args) {
102+
argv.push_back(const_cast<char*>(arg.c_str()));
103+
}
104+
argv.push_back(nullptr);
105+
if (execvp(argv[0], argv.data()) != 0) {
106+
perror("execlp failed");
107+
_exit(1);
108+
}
109+
}
110+
111+
int WaitProcess(int pid)
112+
{
113+
int status;
114+
if (::waitpid(pid, &status, 0 /* options */) != pid) {
115+
throw std::system_error(errno, std::system_category());
116+
}
117+
return status;
118+
}
119+
57120
} // namespace mp

0 commit comments

Comments
 (0)