Skip to content

Commit e348db9

Browse files
authored
gloo/transport/tcp/loop: better memory management and shutdown
Differential Revision: D72080276 Pull Request resolved: #422
1 parent 9d6f6bd commit e348db9

File tree

10 files changed

+134
-121
lines changed

10 files changed

+134
-121
lines changed

gloo/test/tcp_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ TEST(TcpTest, ConnectTimeout) {
1717
// Use bad address
1818
auto remote = Address("::1", 10);
1919
auto timeout = std::chrono::milliseconds(100);
20-
auto fn = [&](std::shared_ptr<Socket>, const Error& e) {
20+
auto fn = [&](Loop&, std::shared_ptr<Socket>, const Error& e) {
2121
std::lock_guard<std::mutex> lock(m);
2222
done = true;
2323
cv.notify_all();
2424

2525
EXPECT_TRUE(e);
2626
EXPECT_TRUE(dynamic_cast<const TimeoutError*>(&e));
2727
};
28-
connectLoop(loop, remote, 0, 5, timeout, std::move(fn));
28+
connectLoop(*loop, remote, 0, 5, timeout, std::move(fn));
2929

3030
std::unique_lock<std::mutex> lock(m);
3131
cv.wait(lock, [&] { return done; });

gloo/transport/tcp/device.cc

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,12 @@ void Device::connectAsInitiator(
341341
const int size,
342342
std::chrono::milliseconds timeout,
343343
connect_callback_t fn) {
344-
auto writeSeq = [loop = loop_, seq = remote.getSeq()](
345-
std::shared_ptr<Socket> socket, connect_callback_t fn) {
346-
// Write sequence number for peer to new socket.
347-
write<sequence_number_t>(loop, std::move(socket), seq, std::move(fn));
348-
};
344+
auto writeSeq =
345+
[seq = remote.getSeq()](
346+
Loop& loop, std::shared_ptr<Socket> socket, connect_callback_t fn) {
347+
// Write sequence number for peer to new socket.
348+
write<sequence_number_t>(loop, std::move(socket), seq, std::move(fn));
349+
};
349350

350351
if (disableConnectionRetries()) {
351352
const auto& sockaddr = remote.getSockaddr();
@@ -356,22 +357,22 @@ void Device::connectAsInitiator(
356357
socket->noDelay(true);
357358
socket->connect(sockaddr);
358359

359-
writeSeq(std::move(socket), std::move(fn));
360+
writeSeq(*loop_, std::move(socket), std::move(fn));
360361
} else {
361362
connectLoop(
362-
loop_,
363+
*loop_,
363364
remote,
364365
rank,
365366
size,
366367
timeout,
367-
[loop = loop_, fn = std::move(fn), writeSeq = std::move(writeSeq)](
368-
std::shared_ptr<Socket> socket, const Error& error) {
368+
[fn = std::move(fn), writeSeq = std::move(writeSeq)](
369+
Loop& loop, std::shared_ptr<Socket> socket, const Error& error) {
369370
if (error) {
370371
fn(socket, error);
371372
return;
372373
}
373374

374-
writeSeq(std::move(socket), std::move(fn));
375+
writeSeq(loop, std::move(socket), fn);
375376
});
376377
}
377378
}

gloo/transport/tcp/helpers.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ namespace transport {
55
namespace tcp {
66

77
void connectLoop(
8-
std::shared_ptr<Loop> loop,
8+
Loop& loop,
99
const Address& remote,
1010
const int rank,
1111
const int size,
1212
std::chrono::milliseconds timeout,
1313
typename ConnectOperation::callback_t fn) {
1414
auto x = std::make_shared<ConnectOperation>(
15-
std::move(loop), remote, rank, size, timeout, std::move(fn));
16-
x->run();
15+
remote, rank, size, timeout, std::move(fn));
16+
x->run(loop);
1717
}
1818

1919
} // namespace tcp

gloo/transport/tcp/helpers.h

Lines changed: 39 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ namespace tcp {
2626
// socket specified at construction. Upon completion or error, the
2727
// callback is called. Its lifetime is coupled with completion of the
2828
// operation, so the called doesn't need to hold on to the instance.
29-
// It does so by storing a shared_ptr to itself (effectively a leak)
30-
// until the event loop calls back.
3129
template <typename T>
3230
class ReadValueOperation final
3331
: public Handler,
@@ -36,29 +34,15 @@ class ReadValueOperation final
3634
using callback_t =
3735
std::function<void(std::shared_ptr<Socket>, const Error& error, T&& t)>;
3836

39-
ReadValueOperation(
40-
std::shared_ptr<Loop> loop,
41-
std::shared_ptr<Socket> socket,
42-
callback_t fn)
43-
: loop_(std::move(loop)),
44-
socket_(std::move(socket)),
45-
fn_(std::move(fn)) {}
37+
ReadValueOperation(std::shared_ptr<Socket> socket, callback_t fn)
38+
: socket_(std::move(socket)), fn_(std::move(fn)) {}
4639

47-
void run() {
48-
// Cannot initialize leak until after the object has been
49-
// constructed, because the std::make_shared initialization
50-
// doesn't run after construction of the underlying object.
51-
leak_ = this->shared_from_this();
52-
// Register with loop only after we've leaked the shared_ptr,
53-
// because we unleak it when the event loop thread calls.
54-
loop_->registerDescriptor(socket_->fd(), EPOLLIN | EPOLLONESHOT, this);
40+
void run(Loop& loop) {
41+
loop.registerDescriptor(
42+
socket_->fd(), EPOLLIN | EPOLLONESHOT, this->shared_from_this());
5543
}
5644

57-
void handleEvents(int events) override {
58-
// Move leaked shared_ptr to the stack so that this object
59-
// destroys itself once this function returns.
60-
auto self = std::move(this->leak_);
61-
45+
void handleEvents(Loop&, int /*events*/) override {
6246
// Read T.
6347
auto rv = socket_->read(&t_, sizeof(t_));
6448
if (rv == -1) {
@@ -80,30 +64,26 @@ class ReadValueOperation final
8064
}
8165

8266
private:
83-
std::shared_ptr<Loop> loop_;
8467
std::shared_ptr<Socket> socket_;
8568
callback_t fn_;
86-
std::shared_ptr<ReadValueOperation<T>> leak_;
8769

8870
T t_;
8971
};
9072

9173
template <typename T>
9274
void read(
93-
std::shared_ptr<Loop> loop,
75+
Loop& loop,
9476
std::shared_ptr<Socket> socket,
9577
typename ReadValueOperation<T>::callback_t fn) {
96-
auto x = std::make_shared<ReadValueOperation<T>>(
97-
std::move(loop), std::move(socket), std::move(fn));
98-
x->run();
78+
auto x =
79+
std::make_shared<ReadValueOperation<T>>(std::move(socket), std::move(fn));
80+
x->run(loop);
9981
}
10082

10183
// WriteValueOperation asynchronously writes a value of type T to the
10284
// socket specified at construction. Upon completion or error, the
10385
// callback is called. Its lifetime is coupled with completion of the
10486
// operation, so the called doesn't need to hold on to the instance.
105-
// It does so by storing a shared_ptr to itself (effectively a leak)
106-
// until the event loop calls back.
10787
template <typename T>
10888
class WriteValueOperation final
10989
: public Handler,
@@ -112,31 +92,15 @@ class WriteValueOperation final
11292
using callback_t =
11393
std::function<void(std::shared_ptr<Socket>, const Error& error)>;
11494

115-
WriteValueOperation(
116-
std::shared_ptr<Loop> loop,
117-
std::shared_ptr<Socket> socket,
118-
T t,
119-
callback_t fn)
120-
: loop_(std::move(loop)),
121-
socket_(std::move(socket)),
122-
fn_(std::move(fn)),
123-
t_(std::move(t)) {}
124-
125-
void run() {
126-
// Cannot initialize leak until after the object has been
127-
// constructed, because the std::make_shared initialization
128-
// doesn't run after construction of the underlying object.
129-
leak_ = this->shared_from_this();
130-
// Register with loop only after we've leaked the shared_ptr,
131-
// because we unleak it when the event loop thread calls.
132-
loop_->registerDescriptor(socket_->fd(), EPOLLOUT | EPOLLONESHOT, this);
133-
}
95+
WriteValueOperation(std::shared_ptr<Socket> socket, T t, callback_t fn)
96+
: socket_(std::move(socket)), fn_(std::move(fn)), t_(std::move(t)) {}
13497

135-
void handleEvents(int events) override {
136-
// Move leaked shared_ptr to the stack so that this object
137-
// destroys itself once this function returns.
138-
auto leak = std::move(this->leak_);
98+
void run(Loop& loop) {
99+
loop.registerDescriptor(
100+
socket_->fd(), EPOLLOUT | EPOLLONESHOT, this->shared_from_this());
101+
}
139102

103+
void handleEvents(Loop&, int /*events*/) override {
140104
// Write T.
141105
auto rv = socket_->write(&t_, sizeof(t_));
142106
if (rv == -1) {
@@ -154,33 +118,30 @@ class WriteValueOperation final
154118
}
155119

156120
private:
157-
std::shared_ptr<Loop> loop_;
158121
std::shared_ptr<Socket> socket_;
159122
callback_t fn_;
160-
std::shared_ptr<WriteValueOperation<T>> leak_;
161123

162124
T t_;
163125
};
164126

165127
template <typename T>
166128
void write(
167-
std::shared_ptr<Loop> loop,
129+
Loop& loop,
168130
std::shared_ptr<Socket> socket,
169131
T t,
170132
typename WriteValueOperation<T>::callback_t fn) {
171133
auto x = std::make_shared<WriteValueOperation<T>>(
172-
std::move(loop), std::move(socket), std::move(t), std::move(fn));
173-
x->run();
134+
std::move(socket), std::move(t), std::move(fn));
135+
x->run(loop);
174136
}
175137

176138
class ConnectOperation final
177139
: public Handler,
178140
public std::enable_shared_from_this<ConnectOperation> {
179141
public:
180-
using callback_t =
181-
std::function<void(std::shared_ptr<Socket>, const Error& error)>;
142+
using callback_t = std::function<
143+
void(Loop& loop, std::shared_ptr<Socket>, const Error& error)>;
182144
ConnectOperation(
183-
std::shared_ptr<Loop> loop,
184145
const Address& remote,
185146
const int rank,
186147
const int size,
@@ -190,15 +151,9 @@ class ConnectOperation final
190151
rank_(rank),
191152
size_(size),
192153
deadline_(std::chrono::steady_clock::now() + timeout),
193-
loop_(std::move(loop)),
194154
fn_(std::move(fn)) {}
195155

196-
void run() {
197-
// Cannot initialize leak until after the object has been
198-
// constructed, because the std::make_shared initialization
199-
// doesn't run after construction of the underlying object.
200-
leak_ = this->shared_from_this();
201-
156+
void run(Loop& loop) {
202157
const auto& sockaddr = remote_.getSockaddr();
203158

204159
// Create new socket to connect to peer.
@@ -207,29 +162,26 @@ class ConnectOperation final
207162
socket_->noDelay(true);
208163
socket_->connect(sockaddr);
209164

210-
// Register with loop only after we've leaked the shared_ptr,
211-
// because we unleak it when the event loop thread calls.
212165
// Register for EPOLLOUT, because we want to be notified when
213166
// the connect completes. EPOLLERR is also necessary because
214167
// connect() can fail.
215-
if (auto loop = loop_.lock()) {
216-
loop->registerDescriptor(
217-
socket_->fd(), EPOLLOUT | EPOLLERR | EPOLLONESHOT, this);
218-
} else {
219-
fn_(socket_, LoopError("loop is gone"));
220-
}
168+
loop.registerDescriptor(
169+
socket_->fd(),
170+
EPOLLOUT | EPOLLERR | EPOLLONESHOT,
171+
this->shared_from_this());
221172
}
222173

223-
void handleEvents(int events) override {
224-
// Move leaked shared_ptr to the stack so that this object
225-
// destroys itself once this function returns.
226-
auto leak = std::move(this->leak_);
174+
void handleEvents(Loop& loop, int /*events*/) override {
175+
// Hold a reference to this object to keep it alive until the
176+
// callback is called.
177+
auto leak = shared_from_this();
178+
loop.unregisterDescriptor(socket_->fd(), this);
227179

228180
int result;
229181
socklen_t result_len = sizeof(result);
230182
if (getsockopt(socket_->fd(), SOL_SOCKET, SO_ERROR, &result, &result_len) <
231183
0) {
232-
fn_(socket_, SystemError("getsockopt", errno, remote_));
184+
fn_(loop, socket_, SystemError("getsockopt", errno, remote_));
233185
return;
234186
}
235187
if (result != 0) {
@@ -248,16 +200,18 @@ class ConnectOperation final
248200
socket_->sockName().str(),
249201
};
250202
DebugLogger::log(debugData);
203+
251204
// check deadline
252205
if (willRetry) {
253-
run();
206+
run(loop);
254207
} else {
255-
fn_(socket_, TimeoutError("timed out connecting: " + e.what()));
208+
fn_(loop, socket_, TimeoutError("timed out connecting: " + e.what()));
256209
}
210+
257211
return;
258212
}
259213

260-
fn_(socket_, Error::kSuccess);
214+
fn_(loop, socket_, Error::kSuccess);
261215
}
262216

263217
private:
@@ -269,16 +223,12 @@ class ConnectOperation final
269223

270224
int retry_{0};
271225

272-
// We use a weak_ptr to the loop to avoid a reference cycle when an error
273-
// occurs.
274-
std::weak_ptr<Loop> loop_;
275226
std::shared_ptr<Socket> socket_;
276227
callback_t fn_;
277-
std::shared_ptr<ConnectOperation> leak_;
278228
};
279229

280230
void connectLoop(
281-
std::shared_ptr<Loop> loop,
231+
Loop& loop,
282232
const Address& remote,
283233
const int rank,
284234
const int size,

gloo/transport/tcp/listener.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ Listener::~Listener() {
4040
}
4141
}
4242

43-
void Listener::handleEvents(int /* unused */) {
43+
void Listener::handleEvents(Loop& loop, int /* unused */) {
4444
std::lock_guard<std::mutex> guard(mutex_);
4545

4646
for (;;) {
@@ -59,7 +59,7 @@ void Listener::handleEvents(int /* unused */) {
5959

6060
// Read sequence number.
6161
read<sequence_number_t>(
62-
loop_,
62+
loop,
6363
sock,
6464
[this](
6565
std::shared_ptr<Socket> socket,

gloo/transport/tcp/listener.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Listener final : public Handler {
3838

3939
~Listener() override;
4040

41-
void handleEvents(int events) override;
41+
void handleEvents(Loop& loop, int events) override;
4242

4343
Address nextAddress();
4444

0 commit comments

Comments
 (0)