Skip to content

Commit 255400b

Browse files
jiazhaiwolfstudy
authored andcommitted
cpp: fix reference leak when reader create (#7793)
### Motivation User reports a valgrind error for `client::createReader` method: ``` ==23308== 284,826 (160 direct, 284,666 indirect) bytes in 1 blocks are definitely lost in loss record 113 of 113 ==23308== at 0x4C2A593: operator new(unsigned long) (vg_replace_malloc.c:344) ==23308== by 0x5303B4A: allocate (new_allocator.h:104) ==23308== by 0x5303B4A: allocate (alloc_traits.h:351) ==23308== by 0x5303B4A: __shared_count<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:499) ==23308== by 0x5303B4A: __shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:957) ==23308== by 0x5303B4A: shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:316) ==23308== by 0x5303B4A: allocate_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:598) ==23308== by 0x5303B4A: make_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader> > (shared_ptr.h:614) ==23308== by 0x5303B4A: Promise (Future.h:91) ==23308== by 0x5303B4A: pulsar::Client::createReader(std::string const&, pulsar::MessageId const&, pulsar::ReaderConfiguration const&, pulsar::Reader&) (Client.cc:142) ==23308== by 0x401DDB: main (pulsarReader.cpp:92) ==23308== ``` It seems the `ReaderImpl` has been tracked twice when call WaitForCallbackValue. this PR is to fix the issue. ### Modifications - fix WaitForCallbackValue which is changed in PR #3484. - add test for the reference issue. ### Verifying this change ut passed. valgrind found no issue: ``` ==14758== LEAK SUMMARY: ==14758== definitely lost: 0 bytes in 0 blocks ==14758== indirectly lost: 0 bytes in 0 blocks ==14758== possibly lost: 0 bytes in 0 blocks ==14758== still reachable: 12,621 bytes in 145 blocks ==14758== suppressed: 0 bytes in 0 blocks ==14758== ==14758== For lists of detected and suppressed errors, rerun with: -s ==14758== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` (cherry picked from commit 0e67fc5)
1 parent e2fbf31 commit 255400b

File tree

5 files changed

+88
-3
lines changed

5 files changed

+88
-3
lines changed

pulsar-client-cpp/lib/ReaderImpl.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ void ReaderImpl::start(const MessageId& startMessageId) {
6363
const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }
6464

6565
void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
66-
readerCreatedCallback_(result, Reader(shared_from_this()));
66+
auto self = shared_from_this();
67+
readerCreatedCallback_(result, Reader(self));
68+
readerImplWeakPtr_ = self;
6769
}
6870

6971
ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; }
@@ -111,4 +113,6 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
111113
consumer_->seekAsync(timestamp, callback);
112114
}
113115

116+
ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
117+
114118
} // namespace pulsar

pulsar-client-cpp/lib/ReaderImpl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
5252
void seekAsync(const MessageId& msgId, ResultCallback callback);
5353
void seekAsync(uint64_t timestamp, ResultCallback callback);
5454

55+
ReaderImplWeakPtr getReaderImplWeakPtr();
56+
5557
private:
5658
void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer);
5759

@@ -65,6 +67,7 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
6567
ConsumerImplPtr consumer_;
6668
ReaderCallback readerCreatedCallback_;
6769
ReaderListener readerListener_;
70+
ReaderImplWeakPtr readerImplWeakPtr_;
6871
};
6972
} // namespace pulsar
7073

pulsar-client-cpp/lib/Utils.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ struct WaitForCallback {
3838

3939
template <typename T>
4040
struct WaitForCallbackValue {
41-
Promise<Result, T> m_promise;
41+
Promise<Result, T>& m_promise;
4242

43-
WaitForCallbackValue(Promise<Result, T> promise) : m_promise(promise) {}
43+
WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}
4444

4545
void operator()(Result result, const T& value) {
4646
if (result == ResultOk) {

pulsar-client-cpp/tests/ReaderTest.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
* under the License.
1818
*/
1919
#include <pulsar/Client.h>
20+
#include <pulsar/Reader.h>
21+
#include "ReaderTest.h"
2022

2123
#include <gtest/gtest.h>
2224

@@ -416,3 +418,47 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
416418
reader.close();
417419
client.close();
418420
}
421+
422+
TEST(ReaderTest, testReferenceLeak) {
423+
Client client(serviceUrl);
424+
425+
std::string topicName = "persistent://public/default/testReferenceLeak";
426+
427+
Producer producer;
428+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
429+
430+
for (int i = 0; i < 10; i++) {
431+
std::string content = "my-message-" + std::to_string(i);
432+
Message msg = MessageBuilder().setContent(content).build();
433+
ASSERT_EQ(ResultOk, producer.send(msg));
434+
}
435+
436+
ReaderConfiguration readerConf;
437+
Reader reader;
438+
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
439+
440+
ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader);
441+
ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader);
442+
443+
LOG_INFO("1 consumer use count " << consumerPtr.use_count());
444+
LOG_INFO("1 reader use count " << readerPtr.use_count());
445+
446+
for (int i = 0; i < 10; i++) {
447+
Message msg;
448+
ASSERT_EQ(ResultOk, reader.readNext(msg));
449+
450+
std::string content = msg.getDataAsString();
451+
std::string expected = "my-message-" + std::to_string(i);
452+
ASSERT_EQ(expected, content);
453+
}
454+
455+
producer.close();
456+
reader.close();
457+
// will be released after exit this method.
458+
ASSERT_EQ(1, consumerPtr.use_count());
459+
ASSERT_EQ(1, readerPtr.use_count());
460+
client.close();
461+
// will be released after exit this method.
462+
ASSERT_EQ(1, consumerPtr.use_count());
463+
ASSERT_EQ(1, readerPtr.use_count());
464+
}

pulsar-client-cpp/tests/ReaderTest.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include "lib/ReaderImpl.h"
20+
#include <string>
21+
22+
using std::string;
23+
24+
namespace pulsar {
25+
class ReaderTest {
26+
public:
27+
static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); }
28+
static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) {
29+
return reader.impl_->getReaderImplWeakPtr();
30+
}
31+
};
32+
} // namespace pulsar

0 commit comments

Comments
 (0)