Skip to content

Commit 8b33ff2

Browse files
Masahiro Sakamotowolfstudy
authored andcommitted
Fix memory leak caused by not being executed ClientConnection destructor (#5286)
(cherry picked from commit 30d280d)
1 parent b0c6a5f commit 8b33ff2

File tree

4 files changed

+13
-5
lines changed

4 files changed

+13
-5
lines changed

pulsar-client-cpp/lib/ClientConnection.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
136136
serverProtocolVersion_(ProtocolVersion_MIN),
137137
maxMessageSize_(Commands::DefaultMaxMessageSize),
138138
executor_(executor),
139-
resolver_(executor->createTcpResolver()),
140-
socket_(executor->createSocket()),
139+
resolver_(executor_->createTcpResolver()),
140+
socket_(executor_->createSocket()),
141141
#if BOOST_VERSION >= 107000
142142
strand_(boost::asio::make_strand(executor_->io_service_.get_executor())),
143143
#elif BOOST_VERSION >= 106600
@@ -222,7 +222,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
222222
}
223223
}
224224

225-
tlsSocket_ = executor->createTlsSocket(socket_, ctx);
225+
tlsSocket_ = executor_->createTlsSocket(socket_, ctx);
226226
}
227227
}
228228

@@ -1325,6 +1325,9 @@ void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_cod
13251325

13261326
void ClientConnection::close() {
13271327
Lock lock(mutex_);
1328+
if (isClosed()) {
1329+
return;
1330+
}
13281331
state_ = Disconnected;
13291332
boost::system::error_code err;
13301333
socket_->close(err);
@@ -1385,6 +1388,10 @@ void ClientConnection::close() {
13851388
if (tlsSocket_) {
13861389
tlsSocket_->lowest_layer().close();
13871390
}
1391+
1392+
if (executor_) {
1393+
executor_.reset();
1394+
}
13881395
}
13891396

13901397
bool ClientConnection::isClosed() const { return state_ == Disconnected; }

pulsar-client-cpp/lib/ClientImpl.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ void ClientImpl::shutdown() {
561561
}
562562
}
563563

564+
pool_.close();
564565
ioExecutorProvider_->close();
565566
listenerExecutorProvider_->close();
566567
partitionListenerExecutorProvider_->close();

pulsar-client-cpp/lib/ConnectionPool.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
4242
poolConnections_(poolConnections),
4343
mutex_() {}
4444

45-
ConnectionPool::~ConnectionPool() {
45+
void ConnectionPool::close() {
4646
std::unique_lock<std::mutex> lock(mutex_);
4747
if (poolConnections_) {
4848
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {

pulsar-client-cpp/lib/ConnectionPool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class PULSAR_PUBLIC ConnectionPool {
3636
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
3737
const AuthenticationPtr& authentication, bool poolConnections = true);
3838

39-
~ConnectionPool();
39+
void close();
4040

4141
/**
4242
* Get a connection from the pool.

0 commit comments

Comments
 (0)