Skip to content

Commit 56e2af1

Browse files
committed
[#26672] docdb: Fixed deadlock on cleaning expired PG sessions when pg_client_use_shared_memory is on
Summary: Faced on a 2024.2.2.1-b7 cluster with running YSQL workload and `--pg_client_use_shared_memory=true`. All 4 threads of `iotp_TabletServer` threadpool are waiting inside `ScheduledTaskTracker::CompleteShutdown()`. All of them are waiting for `big_shared_mem_expiration_task_.CompleteShutdown()`. `big_shared_mem_expiration_task_` was scheduled but not launched due to `iotp_TabletServer` pool is busy with processing `PgClientServiceImpl::Impl::PgCheckExpiredSessions` and all `PgCheckExpiredSessions` are waiting inside `ScheduledTaskTracker::CompleteShutdown()` for `big_shared_mem_expiration_task_ to` complete. Added unit-test to reproduce this issue and implemented fix. Also updated `ScheduledTaskTracker` logs for easier debugging in future. Logs before update: ``` I0401 17:00:43.968747 3586401 scheduler.cc:230] Waiting 1 tasks to complete ``` After update: ``` I0404 19:42:28.436432 1233831 scheduler.cc:231] big_shared_mem_expiration_task(0x000073c0bed8dcd8): Waiting 1 tasks to complete ``` Jira: DB-16052 Test Plan: PgConnTest.SessionExpiration Reviewers: sergei, amitanand Reviewed By: sergei, amitanand Subscribers: yql, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D42981
1 parent bacfdb2 commit 56e2af1

File tree

7 files changed

+156
-11
lines changed

7 files changed

+156
-11
lines changed

src/yb/consensus/raft_consensus.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,8 @@ RaftConsensus::RaftConsensus(
426426
queue_(std::move(queue)),
427427
rng_(GetRandomSeed32()),
428428
withhold_votes_until_(MonoTime::Min()),
429-
step_down_check_tracker_(&peer_proxy_factory_->messenger()->scheduler()),
429+
step_down_check_tracker_(
430+
"step_down_check_tracker", &peer_proxy_factory_->messenger()->scheduler()),
430431
mark_dirty_clbk_(std::move(mark_dirty_clbk)),
431432
shutdown_(false),
432433
follower_memory_pressure_rejections_(tablet_metric_entity->FindOrCreateMetric<Counter>(

src/yb/rpc/scheduler.cc

+10-5
Original file line numberDiff line numberDiff line change
@@ -194,14 +194,15 @@ IoService& Scheduler::io_service() {
194194
return impl_->io_service();
195195
}
196196

197-
ScheduledTaskTracker::ScheduledTaskTracker(Scheduler* scheduler)
198-
: scheduler_(DCHECK_NOTNULL(scheduler)) {}
197+
ScheduledTaskTracker::ScheduledTaskTracker(const std::string& name, Scheduler* scheduler)
198+
: log_prefix_(Format("$0($1): ", name, static_cast<void*>(this))),
199+
scheduler_(DCHECK_NOTNULL(scheduler)) {}
199200

200201
ScheduledTaskTracker::~ScheduledTaskTracker() {
201202
auto last_scheduled_task_id = last_scheduled_task_id_.load(std::memory_order_acquire);
202203
if (last_scheduled_task_id != rpc::kInvalidTaskId) {
203204
auto num_scheduled = num_scheduled_.load(std::memory_order_acquire);
204-
LOG_IF(DFATAL, num_scheduled != kShutdownMark)
205+
LOG_IF_WITH_PREFIX(DFATAL, num_scheduled != kShutdownMark)
205206
<< "Shutdown did not complete on ScheduledTaskTracker";
206207
}
207208
}
@@ -220,14 +221,18 @@ void ScheduledTaskTracker::StartShutdown() {
220221
}
221222
}
222223

224+
bool ScheduledTaskTracker::ReadyToShutdown() const {
225+
return num_scheduled_.load(std::memory_order_acquire) == kShutdownMark;
226+
}
227+
223228
void ScheduledTaskTracker::CompleteShutdown() {
224229
for (;;) {
225230
auto left = num_scheduled_.load(std::memory_order_acquire) - kShutdownMark;
226231
if (left <= 0) {
227-
LOG_IF(DFATAL, left < 0) << "Negative number of tasks left: " << left;
232+
LOG_IF_WITH_PREFIX(DFATAL, left < 0) << "Negative number of tasks left: " << left;
228233
break;
229234
}
230-
YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting " << left << " tasks to complete";
235+
YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 1) << "Waiting " << left << " tasks to complete";
231236
Abort();
232237
std::this_thread::sleep_for(1ms);
233238
}

src/yb/rpc/scheduler.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class ScheduledTaskTracker {
118118
ScheduledTaskTracker() = default;
119119
~ScheduledTaskTracker();
120120

121-
explicit ScheduledTaskTracker(Scheduler* scheduler);
121+
ScheduledTaskTracker(const std::string& name, Scheduler* scheduler);
122122

123123
void Bind(Scheduler* scheduler) {
124124
scheduler_ = scheduler;
@@ -147,14 +147,20 @@ class ScheduledTaskTracker {
147147
void Abort();
148148

149149
void StartShutdown();
150+
bool ReadyToShutdown() const;
150151
void CompleteShutdown();
151152

152153
void Shutdown() {
153154
StartShutdown();
154155
CompleteShutdown();
155156
}
156157

158+
const std::string& LogPrefix() {
159+
return log_prefix_;
160+
}
161+
157162
private:
163+
std::string log_prefix_;
158164
Scheduler* scheduler_ = nullptr;
159165
std::atomic<int64_t> num_scheduled_{0};
160166
std::atomic<rpc::ScheduledTaskId> last_scheduled_task_id_{rpc::kInvalidTaskId};

src/yb/tserver/pg_client_service.cc

+8-3
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ DEFINE_test_flag(uint64, ysql_oid_prefetch_adjustment, 0,
133133
"production environment. In unit test we use this flag to force allocation of "
134134
"large Postgres OIDs.");
135135

136+
DEFINE_test_flag(uint64, delay_before_complete_expired_pg_sessions_shutdown_ms, 0,
137+
"Inject delay before completing shutdown of expired PG sessions.");
138+
136139
DEFINE_RUNTIME_uint64(ysql_cdc_active_replication_slot_window_ms, 60000,
137140
"Determines the window in milliseconds in which if a client has consumed the "
138141
"changes of a ReplicationSlot across any tablet, then it is considered to be "
@@ -244,7 +247,8 @@ class LockablePgClientSession {
244247
}
245248

246249
bool ReadyToShutdown() const {
247-
return !exchange_runnable_ || exchange_runnable_->ReadyToShutdown();
250+
return (!exchange_runnable_ || exchange_runnable_->ReadyToShutdown()) &&
251+
session_.ReadyToShutdown();
248252
}
249253

250254
void CompleteShutdown() {
@@ -452,8 +456,8 @@ class PgClientServiceImpl::Impl {
452456
transaction_pool_provider_(std::move(transaction_pool_provider)),
453457
messenger_(*messenger),
454458
table_cache_(client_future_),
455-
check_expired_sessions_(&messenger->scheduler()),
456-
check_object_id_allocators_(&messenger->scheduler()),
459+
check_expired_sessions_("check_expired_sessions", &messenger->scheduler()),
460+
check_object_id_allocators_("check_object_id_allocators", &messenger->scheduler()),
457461
response_cache_(parent_mem_tracker, metric_entity),
458462
instance_id_(permanent_uuid),
459463
shared_mem_pool_(parent_mem_tracker, instance_id_),
@@ -1833,6 +1837,7 @@ class PgClientServiceImpl::Impl {
18331837
session->session().StartShutdown();
18341838
txn_snapshot_manager_.UnregisterAll(session->id());
18351839
}
1840+
AtomicFlagSleepMs(&FLAGS_TEST_delay_before_complete_expired_pg_sessions_shutdown_ms);
18361841
for (const auto& session : expired_sessions) {
18371842
if (session->session().ReadyToShutdown()) {
18381843
session->session().CompleteShutdown();

src/yb/tserver/pg_client_session.cc

+9-1
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,7 @@ class PgClientSession::Impl {
11261126
lease_epoch_(lease_epoch),
11271127
ts_lock_manager_(std::move(lock_manager)),
11281128
transaction_provider_(std::move(transaction_builder)),
1129-
big_shared_mem_expiration_task_(&scheduler),
1129+
big_shared_mem_expiration_task_("big_shared_mem_expiration_task", &scheduler),
11301130
read_point_history_(PrefixLogger(id_)) {}
11311131

11321132
[[nodiscard]] auto id() const {return id_; }
@@ -2131,6 +2131,10 @@ class PgClientSession::Impl {
21312131
big_shared_mem_expiration_task_.StartShutdown();
21322132
}
21332133

2134+
bool ReadyToShutdown() {
2135+
return big_shared_mem_expiration_task_.ReadyToShutdown();
2136+
}
2137+
21342138
void CompleteShutdown() {
21352139
big_shared_mem_expiration_task_.CompleteShutdown();
21362140
}
@@ -3123,6 +3127,10 @@ void PgClientSession::StartShutdown() {
31233127
return impl_->StartShutdown();
31243128
}
31253129

3130+
bool PgClientSession::ReadyToShutdown() const {
3131+
return impl_->ReadyToShutdown();
3132+
}
3133+
31263134
void PgClientSession::CompleteShutdown() {
31273135
impl_->CompleteShutdown();
31283136
}

src/yb/tserver/pg_client_session.h

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class PgClientSession final {
122122
std::pair<uint64_t, std::byte*> ObtainBigSharedMemorySegment(size_t size);
123123

124124
void StartShutdown();
125+
bool ReadyToShutdown() const;
125126
void CompleteShutdown();
126127

127128
Result<ReadHybridTime> GetTxnSnapshotReadTime(

src/yb/yql/pgwrapper/pg_conn-test.cc

+119
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
// or implied. See the License for the specific language governing permissions and limitations
1111
// under the License.
1212

13+
#include "yb/util/backoff_waiter.h"
1314
#include "yb/util/logging.h"
15+
#include "yb/util/test_thread_holder.h"
1416

1517
#include "yb/yql/pgwrapper/libpq_test_base.h"
1618

@@ -20,6 +22,11 @@ namespace yb {
2022
namespace pgwrapper {
2123

2224
class PgConnTest : public LibPqTestBase {
25+
public:
26+
Result<PGConn> Connect(bool simple_query_protocol = false) {
27+
return LibPqTestBase::Connect(simple_query_protocol);
28+
}
29+
2330
protected:
2431
void TestUriAuth();
2532

@@ -183,5 +190,117 @@ TEST_F_EX(PgConnTest, ConnectionLimit, PgConnTestLimit) {
183190
ASSERT_STR_CONTAINS(s.message().ToBuffer(), "sorry, too many clients already");
184191
}
185192

193+
class PgSessionExpirationTest : public PgConnTest {
194+
public:
195+
int GetNumTabletServers() const override {
196+
return 1;
197+
}
198+
199+
void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
200+
PgConnTest::UpdateMiniClusterOptions(options);
201+
for (const auto& tserver_flag : std::initializer_list<std::string>{
202+
"--pg_client_use_shared_memory=true",
203+
"--max_big_shared_memory_segment_size=1048576",
204+
Format("--io_thread_pool_size=$0", kIoThreadPoolSize),
205+
// Should be big enough for other PgClientServiceImpl::Impl::CheckExpiredSessions
206+
// to be called by scheduler.
207+
Format("--TEST_delay_before_complete_expired_pg_sessions_shutdown_ms=1000"),
208+
Format(
209+
"--pg_client_heartbeat_interval_ms=$0",
210+
ToMilliseconds(kPgClientHeartbeatInterval)),
211+
Format(
212+
"--pg_client_session_expiration_ms=$0",
213+
ToMilliseconds(kPgClientSessionExpiration)),
214+
Format(
215+
"--big_shared_memory_segment_session_expiration_time_ms=$0",
216+
ToMilliseconds(kBigSharedMemorySegmentSesionExpiration)),
217+
}) {
218+
options->extra_tserver_flags.push_back(tserver_flag);
219+
}
220+
}
221+
222+
static constexpr auto kPgClientHeartbeatInterval =
223+
RegularBuildVsDebugVsSanitizers(1500ms, 2000ms, 2000ms);
224+
static constexpr auto kPgClientSessionExpiration = kPgClientHeartbeatInterval * 2;
225+
static constexpr auto kBigSharedMemorySegmentSesionExpiration = kPgClientSessionExpiration * 2;
226+
static constexpr auto kIoThreadPoolSize = 2;
227+
};
228+
229+
TEST_F_EX(PgConnTest, SessionExpiration, PgSessionExpirationTest) {
230+
constexpr auto kGroupSize = 5;
231+
constexpr auto kNumGroups = kIoThreadPoolSize;
232+
constexpr auto kDelayBetweenGroupConnectionsClose = 100ms;
233+
234+
class ClientJob {
235+
public:
236+
ClientJob(const std::string& name, PgConnTest& test, CountDownLatch& latch)
237+
: log_prefix_(name + ": "), test_(test), latch_(latch) {}
238+
239+
void operator()() const {
240+
LOG_WITH_PREFIX(INFO) << "Connecting...";
241+
PGConn conn = ASSERT_RESULT(test_.Connect());
242+
LOG_WITH_PREFIX(INFO) << "Connected";
243+
LOG_WITH_PREFIX(INFO) << "Got response: " << ASSERT_RESULT(conn.FetchRow<int32_t>("SELECT 1"))
244+
<< " latch count: " << latch_.count();
245+
latch_.CountDown();
246+
247+
// Close connections for the group at once.
248+
LOG_WITH_PREFIX(INFO) << "Waiting for latch";
249+
latch_.Wait();
250+
LOG_WITH_PREFIX(INFO) << "Closing connection...";
251+
}
252+
253+
const std::string& LogPrefix() const { return log_prefix_; }
254+
255+
private:
256+
const std::string log_prefix_;
257+
PgConnTest& test_;
258+
CountDownLatch& latch_;
259+
};
260+
261+
TestThreadHolder thread_holder;
262+
std::vector<std::unique_ptr<CountDownLatch>> latches;
263+
for (int i = 0; i < kNumGroups; ++i) {
264+
latches.push_back(std::make_unique<CountDownLatch>(kGroupSize + 1));
265+
for (int j = 0; j < kGroupSize; ++j) {
266+
thread_holder.AddThreadFunctor(
267+
ClientJob(Format("Group #$0, thread #$1", i, j), *this, *latches.back()));
268+
}
269+
}
270+
271+
ASSERT_OK(LoggedWaitFor(
272+
[&latches] {
273+
for (auto& latch : latches) {
274+
if (latch->count() > 1) {
275+
return false;
276+
}
277+
}
278+
return true;
279+
},
280+
RegularBuildVsDebugVsSanitizers(60s, 120s, 180s),
281+
Format("Wait for all queries to return.")));
282+
283+
// Unblock threads with delay between groups.
284+
for (auto& latch : latches) {
285+
LOG(INFO) << "latch->count(): " << latch->count();
286+
latch->CountDown();
287+
std::this_thread::sleep_for(kDelayBetweenGroupConnectionsClose);
288+
}
289+
290+
LOG(INFO) << "Joining test threads";
291+
thread_holder.JoinAll();
292+
293+
// Wait for sessions to expire.
294+
std::this_thread::sleep_for(kPgClientSessionExpiration + 500ms);
295+
296+
LOG(INFO) << "Shutting down cluster";
297+
cluster_->Shutdown();
298+
for (const auto& server : cluster_->daemons()) {
299+
if (server) {
300+
ASSERT_FALSE(server->WasUnsafeShutdown());
301+
}
302+
}
303+
}
304+
186305
} // namespace pgwrapper
187306
} // namespace yb

0 commit comments

Comments
 (0)