Skip to content

Commit f9c826d

Browse files
d4l3kfacebook-github-bot
authored andcommitted
gloo/transport/tcp/loop: better memory management and shutdown (#422)
Summary: Pull Request resolved: #422 This overhauls how we schedule async operations on the tcp Loop. These changes greatly simplifies the memory management and simplifies the shutdown logic since memory ownership is better tied to epoll usage and relies less on shared_ptr copies everywhere to work around it. This makes a few major changes: * adds a new `registerDescriptor` method to Loop that provides a shared_ptr. This method will track the lifetime of the provided Handler and will release it when the epoll handle is updated/deleted. * Make `Handler::handleEvents` signature provide a `Loop&` reference so all objects don't need to hold a shared_ptr to the loop. * Update `helpers.h` to not manage it's own lifetime (instead using registerDescriptor shared_ptr support) and also not hold a shared_ptr to `Loop`. * Introduces a new `shutdown()` method that allows the loop to gracefully shutdown all operations prior to destruction. This simplifies teardown on error since there a few places where handlers (Device) are managed with raw pointers. This isn't used outside of Loop currently but will be in a follow up PR. Differential Revision: D72080276
1 parent 95ca2af commit f9c826d

File tree

10 files changed

+135
-121
lines changed

10 files changed

+135
-121
lines changed

gloo/test/tcp_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ TEST(TcpTest, ConnectTimeout) {
1717
// Use bad address
1818
auto remote = Address("::1", 10);
1919
auto timeout = std::chrono::milliseconds(100);
20-
auto fn = [&](std::shared_ptr<Socket>, const Error& e) {
20+
auto fn = [&](Loop&, std::shared_ptr<Socket>, const Error& e) {
2121
std::lock_guard<std::mutex> lock(m);
2222
done = true;
2323
cv.notify_all();
2424

2525
EXPECT_TRUE(e);
2626
EXPECT_TRUE(dynamic_cast<const TimeoutError*>(&e));
2727
};
28-
connectLoop(loop, remote, 0, 5, timeout, std::move(fn));
28+
connectLoop(*loop, remote, 0, 5, timeout, std::move(fn));
2929

3030
std::unique_lock<std::mutex> lock(m);
3131
cv.wait(lock, [&] { return done; });

gloo/transport/tcp/device.cc

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,12 @@ void Device::connectAsInitiator(
341341
const int size,
342342
std::chrono::milliseconds timeout,
343343
connect_callback_t fn) {
344-
auto writeSeq = [loop = loop_, seq = remote.getSeq()](
345-
std::shared_ptr<Socket> socket, connect_callback_t fn) {
346-
// Write sequence number for peer to new socket.
347-
write<sequence_number_t>(loop, std::move(socket), seq, std::move(fn));
348-
};
344+
auto writeSeq =
345+
[seq = remote.getSeq()](
346+
Loop& loop, std::shared_ptr<Socket> socket, connect_callback_t fn) {
347+
// Write sequence number for peer to new socket.
348+
write<sequence_number_t>(loop, std::move(socket), seq, std::move(fn));
349+
};
349350

350351
if (disableConnectionRetries()) {
351352
const auto& sockaddr = remote.getSockaddr();
@@ -356,22 +357,22 @@ void Device::connectAsInitiator(
356357
socket->noDelay(true);
357358
socket->connect(sockaddr);
358359

359-
writeSeq(std::move(socket), std::move(fn));
360+
writeSeq(*loop_, std::move(socket), std::move(fn));
360361
} else {
361362
connectLoop(
362-
loop_,
363+
*loop_,
363364
remote,
364365
rank,
365366
size,
366367
timeout,
367-
[loop = loop_, fn = std::move(fn), writeSeq = std::move(writeSeq)](
368-
std::shared_ptr<Socket> socket, const Error& error) {
368+
[fn = std::move(fn), writeSeq = std::move(writeSeq)](
369+
Loop& loop, std::shared_ptr<Socket> socket, const Error& error) {
369370
if (error) {
370371
fn(socket, error);
371372
return;
372373
}
373374

374-
writeSeq(std::move(socket), std::move(fn));
375+
writeSeq(loop, std::move(socket), fn);
375376
});
376377
}
377378
}

gloo/transport/tcp/helpers.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ namespace transport {
55
namespace tcp {
66

77
void connectLoop(
8-
std::shared_ptr<Loop> loop,
8+
Loop& loop,
99
const Address& remote,
1010
const int rank,
1111
const int size,
1212
std::chrono::milliseconds timeout,
1313
typename ConnectOperation::callback_t fn) {
1414
auto x = std::make_shared<ConnectOperation>(
15-
std::move(loop), remote, rank, size, timeout, std::move(fn));
16-
x->run();
15+
remote, rank, size, timeout, std::move(fn));
16+
x->run(loop);
1717
}
1818

1919
} // namespace tcp

gloo/transport/tcp/helpers.h

Lines changed: 39 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ namespace tcp {
2626
// socket specified at construction. Upon completion or error, the
2727
// callback is called. Its lifetime is coupled with completion of the
2828
// operation, so the called doesn't need to hold on to the instance.
29-
// It does so by storing a shared_ptr to itself (effectively a leak)
30-
// until the event loop calls back.
3129
template <typename T>
3230
class ReadValueOperation final
3331
: public Handler,
@@ -36,29 +34,15 @@ class ReadValueOperation final
3634
using callback_t =
3735
std::function<void(std::shared_ptr<Socket>, const Error& error, T&& t)>;
3836

39-
ReadValueOperation(
40-
std::shared_ptr<Loop> loop,
41-
std::shared_ptr<Socket> socket,
42-
callback_t fn)
43-
: loop_(std::move(loop)),
44-
socket_(std::move(socket)),
45-
fn_(std::move(fn)) {}
37+
ReadValueOperation(std::shared_ptr<Socket> socket, callback_t fn)
38+
: socket_(std::move(socket)), fn_(std::move(fn)) {}
4639

47-
void run() {
48-
// Cannot initialize leak until after the object has been
49-
// constructed, because the std::make_shared initialization
50-
// doesn't run after construction of the underlying object.
51-
leak_ = this->shared_from_this();
52-
// Register with loop only after we've leaked the shared_ptr,
53-
// because we unleak it when the event loop thread calls.
54-
loop_->registerDescriptor(socket_->fd(), EPOLLIN | EPOLLONESHOT, this);
40+
void run(Loop& loop) {
41+
loop.registerDescriptor(
42+
socket_->fd(), EPOLLIN | EPOLLONESHOT, this->shared_from_this());
5543
}
5644

57-
void handleEvents(int events) override {
58-
// Move leaked shared_ptr to the stack so that this object
59-
// destroys itself once this function returns.
60-
auto self = std::move(this->leak_);
61-
45+
void handleEvents(Loop&, int /*events*/) override {
6246
// Read T.
6347
auto rv = socket_->read(&t_, sizeof(t_));
6448
if (rv == -1) {
@@ -80,30 +64,26 @@ class ReadValueOperation final
8064
}
8165

8266
private:
83-
std::shared_ptr<Loop> loop_;
8467
std::shared_ptr<Socket> socket_;
8568
callback_t fn_;
86-
std::shared_ptr<ReadValueOperation<T>> leak_;
8769

8870
T t_;
8971
};
9072

9173
template <typename T>
9274
void read(
93-
std::shared_ptr<Loop> loop,
75+
Loop& loop,
9476
std::shared_ptr<Socket> socket,
9577
typename ReadValueOperation<T>::callback_t fn) {
96-
auto x = std::make_shared<ReadValueOperation<T>>(
97-
std::move(loop), std::move(socket), std::move(fn));
98-
x->run();
78+
auto x =
79+
std::make_shared<ReadValueOperation<T>>(std::move(socket), std::move(fn));
80+
x->run(loop);
9981
}
10082

10183
// WriteValueOperation asynchronously writes a value of type T to the
10284
// socket specified at construction. Upon completion or error, the
10385
// callback is called. Its lifetime is coupled with completion of the
10486
// operation, so the called doesn't need to hold on to the instance.
105-
// It does so by storing a shared_ptr to itself (effectively a leak)
106-
// until the event loop calls back.
10787
template <typename T>
10888
class WriteValueOperation final
10989
: public Handler,
@@ -112,31 +92,15 @@ class WriteValueOperation final
11292
using callback_t =
11393
std::function<void(std::shared_ptr<Socket>, const Error& error)>;
11494

115-
WriteValueOperation(
116-
std::shared_ptr<Loop> loop,
117-
std::shared_ptr<Socket> socket,
118-
T t,
119-
callback_t fn)
120-
: loop_(std::move(loop)),
121-
socket_(std::move(socket)),
122-
fn_(std::move(fn)),
123-
t_(std::move(t)) {}
124-
125-
void run() {
126-
// Cannot initialize leak until after the object has been
127-
// constructed, because the std::make_shared initialization
128-
// doesn't run after construction of the underlying object.
129-
leak_ = this->shared_from_this();
130-
// Register with loop only after we've leaked the shared_ptr,
131-
// because we unleak it when the event loop thread calls.
132-
loop_->registerDescriptor(socket_->fd(), EPOLLOUT | EPOLLONESHOT, this);
133-
}
95+
WriteValueOperation(std::shared_ptr<Socket> socket, T t, callback_t fn)
96+
: socket_(std::move(socket)), fn_(std::move(fn)), t_(std::move(t)) {}
13497

135-
void handleEvents(int events) override {
136-
// Move leaked shared_ptr to the stack so that this object
137-
// destroys itself once this function returns.
138-
auto leak = std::move(this->leak_);
98+
void run(Loop& loop) {
99+
loop.registerDescriptor(
100+
socket_->fd(), EPOLLOUT | EPOLLONESHOT, this->shared_from_this());
101+
}
139102

103+
void handleEvents(Loop&, int /*events*/) override {
140104
// Write T.
141105
auto rv = socket_->write(&t_, sizeof(t_));
142106
if (rv == -1) {
@@ -154,33 +118,30 @@ class WriteValueOperation final
154118
}
155119

156120
private:
157-
std::shared_ptr<Loop> loop_;
158121
std::shared_ptr<Socket> socket_;
159122
callback_t fn_;
160-
std::shared_ptr<WriteValueOperation<T>> leak_;
161123

162124
T t_;
163125
};
164126

165127
template <typename T>
166128
void write(
167-
std::shared_ptr<Loop> loop,
129+
Loop& loop,
168130
std::shared_ptr<Socket> socket,
169131
T t,
170132
typename WriteValueOperation<T>::callback_t fn) {
171133
auto x = std::make_shared<WriteValueOperation<T>>(
172-
std::move(loop), std::move(socket), std::move(t), std::move(fn));
173-
x->run();
134+
std::move(socket), std::move(t), std::move(fn));
135+
x->run(loop);
174136
}
175137

176138
class ConnectOperation final
177139
: public Handler,
178140
public std::enable_shared_from_this<ConnectOperation> {
179141
public:
180-
using callback_t =
181-
std::function<void(std::shared_ptr<Socket>, const Error& error)>;
142+
using callback_t = std::function<
143+
void(Loop& loop, std::shared_ptr<Socket>, const Error& error)>;
182144
ConnectOperation(
183-
std::shared_ptr<Loop> loop,
184145
const Address& remote,
185146
const int rank,
186147
const int size,
@@ -190,15 +151,9 @@ class ConnectOperation final
190151
rank_(rank),
191152
size_(size),
192153
deadline_(std::chrono::steady_clock::now() + timeout),
193-
loop_(std::move(loop)),
194154
fn_(std::move(fn)) {}
195155

196-
void run() {
197-
// Cannot initialize leak until after the object has been
198-
// constructed, because the std::make_shared initialization
199-
// doesn't run after construction of the underlying object.
200-
leak_ = this->shared_from_this();
201-
156+
void run(Loop& loop) {
202157
const auto& sockaddr = remote_.getSockaddr();
203158

204159
// Create new socket to connect to peer.
@@ -207,29 +162,26 @@ class ConnectOperation final
207162
socket_->noDelay(true);
208163
socket_->connect(sockaddr);
209164

210-
// Register with loop only after we've leaked the shared_ptr,
211-
// because we unleak it when the event loop thread calls.
212165
// Register for EPOLLOUT, because we want to be notified when
213166
// the connect completes. EPOLLERR is also necessary because
214167
// connect() can fail.
215-
if (auto loop = loop_.lock()) {
216-
loop->registerDescriptor(
217-
socket_->fd(), EPOLLOUT | EPOLLERR | EPOLLONESHOT, this);
218-
} else {
219-
fn_(socket_, LoopError("loop is gone"));
220-
}
168+
loop.registerDescriptor(
169+
socket_->fd(),
170+
EPOLLOUT | EPOLLERR | EPOLLONESHOT,
171+
this->shared_from_this());
221172
}
222173

223-
void handleEvents(int events) override {
224-
// Move leaked shared_ptr to the stack so that this object
225-
// destroys itself once this function returns.
226-
auto leak = std::move(this->leak_);
174+
void handleEvents(Loop& loop, int /*events*/) override {
175+
// Hold a reference to this object to keep it alive until the
176+
// callback is called.
177+
auto leak = shared_from_this();
178+
loop.unregisterDescriptor(socket_->fd(), this);
227179

228180
int result;
229181
socklen_t result_len = sizeof(result);
230182
if (getsockopt(socket_->fd(), SOL_SOCKET, SO_ERROR, &result, &result_len) <
231183
0) {
232-
fn_(socket_, SystemError("getsockopt", errno, remote_));
184+
fn_(loop, socket_, SystemError("getsockopt", errno, remote_));
233185
return;
234186
}
235187
if (result != 0) {
@@ -248,16 +200,18 @@ class ConnectOperation final
248200
socket_->sockName().str(),
249201
};
250202
DebugLogger::log(debugData);
203+
251204
// check deadline
252205
if (willRetry) {
253-
run();
206+
run(loop);
254207
} else {
255-
fn_(socket_, TimeoutError("timed out connecting: " + e.what()));
208+
fn_(loop, socket_, TimeoutError("timed out connecting: " + e.what()));
256209
}
210+
257211
return;
258212
}
259213

260-
fn_(socket_, Error::kSuccess);
214+
fn_(loop, socket_, Error::kSuccess);
261215
}
262216

263217
private:
@@ -269,16 +223,12 @@ class ConnectOperation final
269223

270224
int retry_{0};
271225

272-
// We use a weak_ptr to the loop to avoid a reference cycle when an error
273-
// occurs.
274-
std::weak_ptr<Loop> loop_;
275226
std::shared_ptr<Socket> socket_;
276227
callback_t fn_;
277-
std::shared_ptr<ConnectOperation> leak_;
278228
};
279229

280230
void connectLoop(
281-
std::shared_ptr<Loop> loop,
231+
Loop& loop,
282232
const Address& remote,
283233
const int rank,
284234
const int size,

gloo/transport/tcp/listener.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ Listener::~Listener() {
4040
}
4141
}
4242

43-
void Listener::handleEvents(int /* unused */) {
43+
void Listener::handleEvents(Loop& loop, int /* unused */) {
4444
std::lock_guard<std::mutex> guard(mutex_);
4545

4646
for (;;) {
@@ -59,7 +59,7 @@ void Listener::handleEvents(int /* unused */) {
5959

6060
// Read sequence number.
6161
read<sequence_number_t>(
62-
loop_,
62+
loop,
6363
sock,
6464
[this](
6565
std::shared_ptr<Socket> socket,

gloo/transport/tcp/listener.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Listener final : public Handler {
3838

3939
~Listener() override;
4040

41-
void handleEvents(int events) override;
41+
void handleEvents(Loop& loop, int events) override;
4242

4343
Address nextAddress();
4444

0 commit comments

Comments
 (0)