Skip to content

Commit 9fd4829

Browse files
committed
fix server shutdown
1 parent 44b3d0f commit 9fd4829

File tree

6 files changed

+179
-68
lines changed

6 files changed

+179
-68
lines changed

ext-src/swoole_server.cc

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2641,31 +2641,25 @@ static PHP_METHOD(swoole_server, start) {
26412641
ServerObject *server_object = server_fetch_object(Z_OBJ_P(php_swoole_server_zval_ptr(serv)));
26422642

26432643
#ifdef SW_THREAD
2644+
zend_string *bootstrap = nullptr;
2645+
zend_string *thread_argv_serialized = nullptr;
2646+
zval thread_argv = {};
2647+
26442648
if (serv->is_thread_mode()) {
26452649
zval *_bootstrap = zend::object_get(ZEND_THIS, ZEND_STRL("bootstrap"));
2646-
zend_string *bootstrap = zend_string_dup(Z_STR_P(_bootstrap), 1);
2647-
zend_string *argv = nullptr;
2648-
zval thread_argv = {};
2650+
bootstrap = zend_string_dup(Z_STR_P(_bootstrap), 1);
26492651

26502652
if (!ZVAL_IS_NULL(&server_object->init_arguments)) {
26512653
call_user_function(NULL, NULL, &server_object->init_arguments, &thread_argv, 0, NULL);
2652-
argv = php_swoole_thread_serialize(&thread_argv);
2654+
thread_argv_serialized = php_swoole_thread_serialize(&thread_argv);
26532655
}
26542656

2655-
serv->worker_thread_start = [bootstrap, argv](const WorkerFn &fn) {
2657+
serv->worker_thread_start = [bootstrap, thread_argv_serialized](const WorkerFn &fn) {
26562658
worker_thread_fn = fn;
26572659
zend_string *bootstrap_copy = zend_string_dup(bootstrap, 1);
2658-
zend_string *argv_copy = argv ? zend_string_dup(argv, 1) : nullptr;
2660+
zend_string *argv_copy = thread_argv_serialized ? zend_string_dup(thread_argv_serialized, 1) : nullptr;
26592661
php_swoole_thread_start(bootstrap_copy, argv_copy);
26602662
};
2661-
2662-
ON_SCOPE_EXIT {
2663-
zend_string_release(bootstrap);
2664-
if (argv) {
2665-
zend_string_release(argv);
2666-
}
2667-
zval_ptr_dtor(&thread_argv);
2668-
};
26692663
}
26702664
#endif
26712665

@@ -2675,6 +2669,17 @@ static PHP_METHOD(swoole_server, start) {
26752669
if (serv->start() < 0) {
26762670
php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error);
26772671
}
2672+
2673+
#ifdef SW_THREAD
2674+
if (bootstrap) {
2675+
zend_string_release(bootstrap);
2676+
}
2677+
if (thread_argv_serialized) {
2678+
zend_string_release(thread_argv_serialized);
2679+
}
2680+
zval_ptr_dtor(&thread_argv);
2681+
#endif
2682+
26782683
RETURN_TRUE;
26792684
}
26802685

include/swoole_server.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ struct ReactorThread {
160160

161161
int init(Server *serv, Reactor *reactor, uint16_t reactor_id);
162162
void shutdown(Reactor *reactor);
163+
int close_connection(Reactor *reactor, SessionId session_id);
164+
void clean();
163165
};
164166

165167
struct ServerPortGS {
@@ -1075,6 +1077,10 @@ class Server {
10751077
}
10761078
}
10771079

1080+
bool is_master_thread() {
1081+
return swoole_get_thread_type() == Server::THREAD_MASTER;
1082+
}
1083+
10781084
bool is_hash_dispatch_mode() {
10791085
return dispatch_mode == DISPATCH_FDMOD || dispatch_mode == DISPATCH_IPMOD ||
10801086
dispatch_mode == DISPATCH_CO_CONN_LB;
@@ -1311,7 +1317,7 @@ class Server {
13111317

13121318
void call_hook(enum HookType type, void *arg);
13131319
void call_worker_start_callback(Worker *worker);
1314-
ResultCode call_command_handler(MessageBus &mb, uint16_t worker_id, network::Socket *sock);
1320+
void call_command_handler(MessageBus &mb, uint16_t worker_id, network::Socket *sock);
13151321
std::string call_command_handler_in_master(int command_id, const std::string &msg);
13161322
void call_command_callback(int64_t request_id, const std::string &result);
13171323
void foreach_connection(const std::function<void(Connection *)> &callback);

src/server/master.cc

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ void Server::call_command_callback(int64_t request_id, const std::string &result
9393
iter->second(this, result);
9494
}
9595

96-
ResultCode Server::call_command_handler(MessageBus &mb, uint16_t worker_id, Socket *sock) {
96+
void Server::call_command_handler(MessageBus &mb, uint16_t worker_id, Socket *sock) {
9797
PipeBuffer *buffer = mb.get_buffer();
9898
int command_id = buffer->info.server_fd;
9999
auto iter = command_handlers.find(command_id);
100100
if (iter == command_handlers.end()) {
101101
swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_INVALID_COMMAND, "Unknown command[command_id=%d]", command_id);
102-
return SW_OK;
102+
return;
103103
}
104104

105105
Server::Command::Handler handler = iter->second;
@@ -114,7 +114,7 @@ ResultCode Server::call_command_handler(MessageBus &mb, uint16_t worker_id, Sock
114114
task.info.len = result.length();
115115
task.data = result.c_str();
116116

117-
return mb.write(sock, &task) ? SW_OK : SW_ERR;
117+
mb.write(sock, &task);
118118
}
119119

120120
std::string Server::call_command_handler_in_master(int command_id, const std::string &msg) {
@@ -877,7 +877,12 @@ bool Server::shutdown() {
877877
if (is_base_mode()) {
878878
pid = get_manager_pid() == 0 ? get_master_pid() : get_manager_pid();
879879
} else if (is_thread_mode()) {
880-
return factory->shutdown();
880+
if (is_master_thread()) {
881+
stop_master_thread();
882+
} else {
883+
running = false;
884+
}
885+
return true;
881886
} else {
882887
pid = get_master_pid();
883888
}
@@ -943,6 +948,14 @@ void Server::stop_master_thread() {
943948
};
944949
reactor->set_exit_condition(Reactor::EXIT_CONDITION_FORCED_TERMINATION, fn);
945950
}
951+
if (is_thread_mode()) {
952+
SW_LOOP_N(reactor_num) {
953+
auto thread = get_thread(i);
954+
DataHead ev = {};
955+
ev.type = SW_SERVER_EVENT_SHUTDOWN;
956+
thread->notify_pipe->send_blocking((void *) &ev, sizeof(ev));
957+
}
958+
}
946959
}
947960

948961
bool Server::signal_handler_shutdown() {
@@ -1004,7 +1017,7 @@ void Server::destroy() {
10041017
if (task_worker_num > 0) {
10051018
gs->task_workers.destroy();
10061019
}
1007-
} else {
1020+
} else if (is_process_mode()) {
10081021
swoole_trace_log(SW_TRACE_SERVER, "terminate reactor threads");
10091022
/**
10101023
* Wait until all the end of the thread
@@ -1646,6 +1659,11 @@ void Server::timer_callback(Timer *timer, TimerNode *tnode) {
16461659
if (serv->hooks[Server::HOOK_MASTER_TIMER]) {
16471660
serv->call_hook(Server::HOOK_MASTER_TIMER, serv);
16481661
}
1662+
1663+
if (!serv->is_running()) {
1664+
sw_reactor()->running = false;
1665+
serv->stop_master_thread();
1666+
}
16491667
}
16501668

16511669
int Server::add_worker(Worker *worker) {
@@ -1664,7 +1682,7 @@ bool Server::add_command(const std::string &name, int accepted_process_types, co
16641682
if (commands.find(name) != commands.end()) {
16651683
return false;
16661684
}
1667-
if (is_process_mode() && pipe_command == nullptr) {
1685+
if (!is_base_mode() && pipe_command == nullptr) {
16681686
auto _pipe = new UnixSocket(false, SOCK_DGRAM);
16691687
if (!_pipe->ready()) {
16701688
delete _pipe;

src/server/reactor_thread.cc

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,39 @@ void ReactorThread::shutdown(Reactor *reactor) {
329329
});
330330

331331
reactor->set_wait_exit(true);
332+
}
332333

333-
message_bus.free_buffer();
334+
int ReactorThread::close_connection(Reactor *reactor, SessionId session_id) {
335+
Server *serv = (Server *) reactor->ptr;
336+
Connection *conn = serv->get_connection_verify_no_ssl(session_id);
337+
if (!conn) {
338+
swoole_error_log(SW_LOG_TRACE,
339+
SW_ERROR_SESSION_NOT_EXIST,
340+
"force close connection failed, session#%ld does not exist",
341+
session_id);
342+
return SW_OK;
343+
}
344+
345+
if (serv->disable_notify || conn->close_force) {
346+
return Server::close_connection(reactor, conn->socket);
347+
}
348+
349+
#ifdef SW_USE_OPENSSL
350+
/**
351+
* SSL connections that have not completed the handshake,
352+
* do not need to notify the workers, just close
353+
*/
354+
if (conn->ssl && !conn->ssl_ready) {
355+
return Server::close_connection(reactor, conn->socket);
356+
}
357+
#endif
358+
conn->close_force = 1;
359+
Event _ev = {};
360+
_ev.fd = conn->fd;
361+
_ev.socket = conn->socket;
362+
reactor->trigger_close_event(&_ev);
363+
364+
return SW_OK;
334365
}
335366

336367
/**
@@ -350,49 +381,21 @@ static int ReactorThread_onPipeRead(Reactor *reactor, Event *ev) {
350381
if (resp->info.type == SW_SERVER_EVENT_INCOMING) {
351382
Connection *conn = serv->get_connection_verify_no_ssl(resp->info.fd);
352383
if (conn && serv->connection_incoming(reactor, conn) < 0) {
353-
return reactor->close(reactor, conn->socket);
384+
reactor->close(reactor, conn->socket);
354385
}
355386
} else if (resp->info.type == SW_SERVER_EVENT_COMMAND_REQUEST) {
356-
return serv->call_command_handler(thread->message_bus, thread->id, thread->pipe_command);
387+
serv->call_command_handler(thread->message_bus, thread->id, thread->pipe_command);
357388
} else if (resp->info.type == SW_SERVER_EVENT_COMMAND_RESPONSE) {
358389
auto packet = thread->message_bus.get_packet();
359390
serv->call_command_callback(resp->info.fd, std::string(packet.data, packet.length));
360391
} else if (resp->info.type == SW_SERVER_EVENT_SHUTDOWN) {
361392
thread->shutdown(reactor);
362-
return SW_OK;
363393
} else if (resp->info.type == SW_SERVER_EVENT_FINISH) {
364394
serv->onFinish(serv, (EventData *) resp);
365395
} else if (resp->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) {
366396
serv->onPipeMessage(serv, (EventData *) resp);
367397
} else if (resp->info.type == SW_SERVER_EVENT_CLOSE_FORCE) {
368-
SessionId session_id = resp->info.fd;
369-
Connection *conn = serv->get_connection_verify_no_ssl(session_id);
370-
if (!conn) {
371-
swoole_error_log(SW_LOG_TRACE,
372-
SW_ERROR_SESSION_NOT_EXIST,
373-
"force close connection failed, session#%ld does not exist",
374-
session_id);
375-
return SW_OK;
376-
}
377-
378-
if (serv->disable_notify || conn->close_force) {
379-
return Server::close_connection(reactor, conn->socket);
380-
}
381-
382-
#ifdef SW_USE_OPENSSL
383-
/**
384-
* SSL connections that have not completed the handshake,
385-
* do not need to notify the workers, just close
386-
*/
387-
if (conn->ssl && !conn->ssl_ready) {
388-
return Server::close_connection(reactor, conn->socket);
389-
}
390-
#endif
391-
conn->close_force = 1;
392-
Event _ev = {};
393-
_ev.fd = conn->fd;
394-
_ev.socket = conn->socket;
395-
reactor->trigger_close_event(&_ev);
398+
thread->close_connection(reactor, resp->info.fd);
396399
} else {
397400
PacketPtr packet = thread->message_bus.get_packet();
398401
_send.info = resp->info;
@@ -793,6 +796,15 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) {
793796
return SW_OK;
794797
}
795798

799+
void ReactorThread::clean() {
800+
sw_free(pipe_sockets);
801+
if (pipe_command) {
802+
pipe_command->fd = -1;
803+
delete pipe_command;
804+
}
805+
message_bus.free_buffer();
806+
}
807+
796808
void Server::reactor_thread_main_loop(Server *serv, int reactor_id) {
797809
SwooleTG.id = reactor_id;
798810
SwooleTG.type = Server::THREAD_REACTOR;
@@ -824,11 +836,7 @@ void Server::reactor_thread_main_loop(Server *serv, int reactor_id) {
824836
if (serv->is_thread_mode()) {
825837
serv->worker_stop_callback(serv->get_worker(reactor_id));
826838
}
827-
sw_free(thread->pipe_sockets);
828-
if (thread->pipe_command) {
829-
thread->pipe_command->fd = -1;
830-
delete thread->pipe_command;
831-
}
839+
thread->clean();
832840
}
833841

834842
static void ReactorThread_resume_data_receiving(Timer *timer, TimerNode *tnode) {

src/server/thread.cc

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,16 @@ bool ThreadFactory::start() {
5656
}
5757

5858
bool ThreadFactory::shutdown() {
59-
server_->running = false;
60-
SW_LOOP_N(server_->reactor_num) {
61-
auto thread = server_->get_thread(i);
62-
DataHead ev = {};
63-
ev.type = SW_SERVER_EVENT_SHUTDOWN;
64-
thread->notify_pipe->send_blocking((void *) &ev, sizeof(ev));
59+
for (auto &thread : threads_) {
60+
if (thread.joinable()) {
61+
thread.join();
62+
}
6563
}
66-
server_->stop_master_thread();
6764
return true;
6865
}
6966

7067
ThreadFactory::~ThreadFactory() {
71-
for (auto &thread : threads_) {
72-
thread.join();
73-
}
68+
7469
}
7570

7671
void ThreadFactory::at_thread_exit(Worker *worker) {

0 commit comments

Comments
 (0)