Skip to content

Commit 105eea8

Browse files
committed
refactor thread barrier
1 parent 565b560 commit 105eea8

File tree

7 files changed

+93
-45
lines changed

7 files changed

+93
-45
lines changed

include/swoole.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ typedef unsigned long ulong_t;
176176
#define SW_ASSERT(e)
177177
#define SW_ASSERT_1BYTE(v)
178178
#endif
179-
#define SW_START_SLEEP usleep(100000) // sleep 1s,wait fork and pthread_create
179+
#define SW_START_SLEEP usleep(100000) // sleep 0.1s, wait fork and pthread_create
180180

181181
#ifdef SW_THREAD
182182
#define SW_THREAD_LOCAL thread_local

include/swoole_lock.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,23 @@ class SpinLock : public Lock {
105105
int trylock() override;
106106
};
107107
#endif
108+
109+
#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
110+
#define SW_USE_PTHREAD_BARRIER
111+
#endif
112+
113+
struct Barrier {
114+
#ifdef SW_USE_PTHREAD_BARRIER
115+
pthread_barrier_t barrier_;
116+
pthread_barrierattr_t barrier_attr_;
117+
bool shared_;
118+
#else
119+
sw_atomic_t count_;
120+
sw_atomic_t barrier_;
121+
#endif
122+
void init(bool shared, int count);
123+
void wait();
124+
void destroy();
125+
};
126+
108127
} // namespace swoole

include/swoole_server.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -394,10 +394,7 @@ struct ServerGS {
394394

395395
sw_atomic_t spinlock;
396396

397-
#ifdef HAVE_PTHREAD_BARRIER
398-
pthread_barrier_t manager_barrier;
399-
pthread_barrierattr_t manager_barrier_attr;
400-
#endif
397+
Barrier manager_barrier;
401398

402399
ProcessPool task_workers;
403400
ProcessPool event_workers;
@@ -858,9 +855,7 @@ class Server {
858855
std::shared_ptr<std::vector<std::string>> http_index_files = nullptr;
859856
std::shared_ptr<std::unordered_set<std::string>> http_compression_types = nullptr;
860857

861-
#ifdef HAVE_PTHREAD_BARRIER
862-
pthread_barrier_t reactor_thread_barrier = {};
863-
#endif
858+
Barrier reactor_thread_barrier = {};
864859

865860
/**
866861
* temporary directory for HTTP uploaded file.

src/lock/barrier.cc

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
+----------------------------------------------------------------------+
3+
| Swoole |
4+
+----------------------------------------------------------------------+
5+
| This source file is subject to version 2.0 of the Apache license, |
6+
| that is bundled with this package in the file LICENSE, and is |
7+
| available through the world-wide-web at the following url: |
8+
| http://www.apache.org/licenses/LICENSE-2.0.html |
9+
| If you did not receive a copy of the Apache2.0 license and are unable|
10+
| to obtain it through the world-wide-web, please send a note to |
11+
| [email protected] so we can mail you a copy immediately. |
12+
+----------------------------------------------------------------------+
13+
| Author: Tianfeng Han <[email protected]> |
14+
+----------------------------------------------------------------------+
15+
*/
16+
17+
#include "swoole.h"
18+
#include "swoole_lock.h"
19+
20+
namespace swoole {
21+
22+
#define BARRIER_USEC 10000
23+
24+
void Barrier::init(bool shared, int count) {
25+
#ifdef SW_USE_PTHREAD_BARRIER
26+
if (shared) {
27+
pthread_barrierattr_setpshared(&barrier_attr_, PTHREAD_PROCESS_SHARED);
28+
pthread_barrier_init(&barrier_, &barrier_attr_, count);
29+
} else {
30+
pthread_barrier_init(&barrier_, nullptr, count);
31+
}
32+
shared_ = shared;
33+
#else
34+
barrier_ = 0;
35+
count_ = count;
36+
#endif
37+
}
38+
39+
void Barrier::wait() {
40+
#ifdef SW_USE_PTHREAD_BARRIER
41+
pthread_barrier_wait(&barrier_);
42+
#else
43+
sw_atomic_add_fetch(&barrier_, 1);
44+
SW_LOOP {
45+
if (*barrier_ == count_) {
46+
break;
47+
}
48+
usleep(BARRIER_USEC);
49+
sw_atomic_memory_barrier();
50+
}
51+
#endif
52+
}
53+
54+
void Barrier::destroy() {
55+
#ifdef SW_USE_PTHREAD_BARRIER
56+
pthread_barrier_destroy(&barrier_);
57+
if (shared_) {
58+
pthread_barrierattr_destroy(&barrier_attr_);
59+
}
60+
#endif
61+
}
62+
63+
}; // namespace swoole

src/server/manager.cc

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,7 @@ void Manager::wait(Server *_server) {
192192
int sigid = SIGTERM;
193193
procctl(P_PID, 0, PROC_PDEATHSIG_CTL, &sigid);
194194
#endif
195-
196-
#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
197-
pthread_barrier_wait(&_server->gs->manager_barrier);
198-
#else
199-
SW_START_SLEEP;
200-
#endif
195+
_server->gs->manager_barrier.wait();
201196
}
202197

203198
if (_server->isset_hook(Server::HOOK_MANAGER_START)) {

src/server/master.cc

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -434,21 +434,12 @@ int Server::start_master_thread(Reactor *reactor) {
434434
return SW_ERR;
435435
}
436436

437-
#ifdef HAVE_PTHREAD_BARRIER
438437
if (!single_thread) {
439-
pthread_barrier_wait(&reactor_thread_barrier);
438+
reactor_thread_barrier.wait();
440439
}
441-
#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
442-
SW_START_SLEEP;
443-
#else
444440
if (is_process_mode()) {
445-
pthread_barrier_wait(&gs->manager_barrier);
441+
gs->manager_barrier.wait();
446442
}
447-
#endif
448-
#else
449-
SW_START_SLEEP;
450-
#endif
451-
452443
gs->master_pid = getpid();
453444

454445
if (isset_hook(HOOK_MASTER_START)) {
@@ -835,15 +826,10 @@ int Server::create() {
835826
return SW_ERR;
836827
}
837828

838-
#ifdef HAVE_PTHREAD_BARRIER
839829
if (is_process_mode() || is_thread_mode()) {
840-
pthread_barrier_init(&reactor_thread_barrier, nullptr, reactor_num + 1);
841-
#if !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
842-
pthread_barrierattr_setpshared(&gs->manager_barrier_attr, PTHREAD_PROCESS_SHARED);
843-
pthread_barrier_init(&gs->manager_barrier, &gs->manager_barrier_attr, 2);
844-
#endif
830+
reactor_thread_barrier.init(false, reactor_num + 1);
831+
gs->manager_barrier.init(true, 2);
845832
}
846-
#endif
847833

848834
if (swoole_isset_hook(SW_GLOBAL_HOOK_AFTER_SERVER_CREATE)) {
849835
swoole_call_hook(SW_GLOBAL_HOOK_AFTER_SERVER_CREATE, this);
@@ -1060,16 +1046,10 @@ void Server::destroy() {
10601046
delete l;
10611047
}
10621048
}
1063-
#ifdef HAVE_PTHREAD_BARRIER
10641049
if (is_process_mode()) {
1065-
pthread_barrier_destroy(&reactor_thread_barrier);
1066-
#if !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
1067-
pthread_barrier_destroy(&gs->manager_barrier);
1068-
pthread_barrierattr_destroy(&gs->manager_barrier_attr);
1069-
#endif
1050+
reactor_thread_barrier.destroy();
1051+
gs->manager_barrier.destroy();
10701052
}
1071-
#endif
1072-
10731053
if (is_base_mode()) {
10741054
destroy_base_factory();
10751055
} else if (is_thread_mode()) {

src/server/reactor_thread.cc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -826,11 +826,7 @@ void Server::reactor_thread_main_loop(Server *serv, int reactor_id) {
826826
}
827827

828828
// wait other thread
829-
#ifdef HAVE_PTHREAD_BARRIER
830-
pthread_barrier_wait(&serv->reactor_thread_barrier);
831-
#else
832-
SW_START_SLEEP;
833-
#endif
829+
serv->reactor_thread_barrier.wait();
834830
// main loop
835831
swoole_event_wait();
836832
if (serv->is_thread_mode()) {

0 commit comments

Comments
 (0)