Skip to content

Commit 57a4671

Browse files
committed
Optimize server code [2]
1 parent 44688fc commit 57a4671

File tree

11 files changed

+111
-88
lines changed

11 files changed

+111
-88
lines changed

core-tests/src/server/server.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ TEST(server, schedule) {
3939
ASSERT_EQ(SW_OK, ret);
4040

4141
for (uint32_t i = 0; i < serv.worker_num; i++) {
42-
serv.workers[i].status = SW_WORKER_BUSY;
42+
serv.workers[i].set_status_to_busy();
4343
}
4444

4545
std::set<int> _worker_id_set;
@@ -51,7 +51,7 @@ TEST(server, schedule) {
5151
ASSERT_EQ(_worker_id_set.size(), serv.worker_num);
5252

5353
for (uint32_t i = 1; i < serv.worker_num - 1; i++) {
54-
serv.workers[i].status = SW_WORKER_IDLE;
54+
serv.workers[i].set_status_to_idle();
5555
}
5656

5757
_worker_id_set.clear();

ext-src/swoole_process_pool.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,13 @@ static void process_pool_onMessage(ProcessPool *pool, RecvData *msg) {
190190
ZVAL_STRINGL(&args[1], data, length);
191191
}
192192
}
193+
auto *worker = sw_worker();
194+
worker->set_status_to_busy();
193195
if (UNEXPECTED(!zend::function::call(pp->onMessage, 2, args, nullptr, pp->enable_coroutine))) {
194196
php_swoole_error(E_WARNING, "%s->onMessage handler error", SW_Z_OBJCE_NAME_VAL_P(zobject));
195197
}
198+
worker->add_request_count();
199+
worker->set_status_to_idle();
196200
zval_ptr_dtor(&args[1]);
197201
}
198202

ext-src/swoole_server.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,7 +1276,7 @@ static int php_swoole_server_onTask(Server *serv, EventData *req) {
12761276
} else {
12771277
argc = 4;
12781278
argv[0] = *zserv;
1279-
ZVAL_LONG(&argv[1], (zend_long) req->info.fd);
1279+
ZVAL_LONG(&argv[1], (zend_long) serv->get_task_id(req));
12801280
ZVAL_LONG(&argv[2], (zend_long) req->info.reactor_id);
12811281
argv[3] = zresult.value;
12821282
}
@@ -1304,14 +1304,14 @@ static int php_swoole_server_onTask(Server *serv, EventData *req) {
13041304
static int php_swoole_server_onFinish(Server *serv, EventData *req) {
13051305
zval *zserv = php_swoole_server_zval_ptr(serv);
13061306
ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
1307+
TaskId task_id = serv->get_task_id(req);
13071308

13081309
zend::Variable zresult;
13091310
if (!php_swoole_server_task_unpack(zresult.ptr(), req)) {
13101311
return SW_ERR;
13111312
}
13121313

13131314
if (req->info.ext_flags & SW_TASK_COROUTINE) {
1314-
TaskId task_id = req->info.fd;
13151315
auto task_co_iterator = server_object->property->task_coroutine_map.find(task_id);
13161316
if (task_co_iterator == server_object->property->task_coroutine_map.end()) {
13171317
swoole_error_log(SW_LOG_WARNING, SW_ERROR_TASK_TIMEOUT, "task[%ld] has expired", task_id);
@@ -1349,7 +1349,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {
13491349

13501350
zend_fcall_info_cache *fci_cache = nullptr;
13511351
if (req->info.ext_flags & SW_TASK_CALLBACK) {
1352-
auto callback_iterator = server_object->property->task_callbacks.find(req->info.fd);
1352+
auto callback_iterator = server_object->property->task_callbacks.find(task_id);
13531353
if (callback_iterator == server_object->property->task_callbacks.end()) {
13541354
req->info.ext_flags = req->info.ext_flags & (~SW_TASK_CALLBACK);
13551355
} else {
@@ -1372,7 +1372,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {
13721372
zval *object = &args[1];
13731373
object_init_ex(object, swoole_server_task_result_ce);
13741374
zend_update_property_long(
1375-
swoole_server_task_result_ce, SW_Z8_OBJ_P(object), ZEND_STRL("task_id"), (zend_long) req->info.fd);
1375+
swoole_server_task_result_ce, SW_Z8_OBJ_P(object), ZEND_STRL("task_id"), (zend_long) task_id);
13761376
zend_update_property_long(swoole_server_task_result_ce,
13771377
SW_Z8_OBJ_P(object),
13781378
ZEND_STRL("task_worker_id"),
@@ -1382,7 +1382,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {
13821382
zend_update_property(swoole_server_task_result_ce, SW_Z8_OBJ_P(object), ZEND_STRL("data"), zresult.ptr());
13831383
argc = 2;
13841384
} else {
1385-
ZVAL_LONG(&args[1], req->info.fd);
1385+
ZVAL_LONG(&args[1], (zend_long) task_id);
13861386
args[2] = zresult.value;
13871387
argc = 3;
13881388
}
@@ -1392,7 +1392,7 @@ static int php_swoole_server_onFinish(Server *serv, EventData *req) {
13921392
}
13931393
if (req->info.ext_flags & SW_TASK_CALLBACK) {
13941394
sw_zend_fci_cache_discard(fci_cache);
1395-
server_object->property->task_callbacks.erase(req->info.fd);
1395+
server_object->property->task_callbacks.erase(task_id);
13961396
}
13971397
if (serv->event_object) {
13981398
zval_ptr_dtor(&args[1]);
@@ -3076,7 +3076,7 @@ static PHP_METHOD(swoole_server, taskwait) {
30763076
}
30773077

30783078
int _dst_worker_id = (int) dst_worker_id;
3079-
TaskId task_id = buf.info.fd;
3079+
TaskId task_id = serv->get_task_id(&buf);
30803080

30813081
// coroutine
30823082
if (swoole_coroutine_is_in()) {
@@ -3125,7 +3125,7 @@ static PHP_METHOD(swoole_server, taskwait) {
31253125
break;
31263126
}
31273127
if (pipe->read(&notify, sizeof(notify)) > 0) {
3128-
if (task_result->info.fd != task_id) {
3128+
if (serv->get_task_id(task_result) != task_id) {
31293129
continue;
31303130
}
31313131
zval zresult;
@@ -3264,7 +3264,7 @@ static PHP_METHOD(swoole_server, taskWaitMulti) {
32643264

32653265
do {
32663266
EventData *result = (EventData *) (content->str + content->offset);
3267-
TaskId task_id = result->info.fd;
3267+
TaskId task_id = serv->get_task_id(result);
32683268
zval zresult;
32693269
if (!php_swoole_server_task_unpack(&zresult, result)) {
32703270
goto _next;
@@ -3409,12 +3409,14 @@ static PHP_METHOD(swoole_server, task) {
34093409
RETURN_FALSE;
34103410
}
34113411

3412+
TaskId task_id = serv->get_task_id(&buf);
3413+
34123414
if (!serv->is_worker()) {
34133415
buf.info.ext_flags |= SW_TASK_NOREPLY;
34143416
} else if (fci.size) {
34153417
buf.info.ext_flags |= SW_TASK_CALLBACK;
34163418
sw_zend_fci_cache_persist(&fci_cache);
3417-
server_object->property->task_callbacks[buf.info.fd] = fci_cache;
3419+
server_object->property->task_callbacks[task_id] = fci_cache;
34183420
}
34193421

34203422
buf.info.ext_flags |= SW_TASK_NONBLOCK;
@@ -3423,7 +3425,7 @@ static PHP_METHOD(swoole_server, task) {
34233425
sw_atomic_fetch_add(&serv->gs->tasking_num, 1);
34243426

34253427
if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) >= 0) {
3426-
RETURN_LONG(buf.info.fd);
3428+
RETURN_LONG(task_id);
34273429
}
34283430

34293431
sw_atomic_fetch_sub(&serv->gs->tasking_num, 1);

include/swoole_process_pool.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,18 @@ struct Worker {
157157
status = _status;
158158
}
159159

160+
void set_status_to_idle() {
161+
set_status(SW_WORKER_IDLE);
162+
}
163+
164+
void set_status_to_busy() {
165+
set_status(SW_WORKER_BUSY);
166+
}
167+
168+
void add_request_count() {
169+
request_count++;
170+
}
171+
160172
bool is_busy() {
161173
return status == SW_WORKER_BUSY;
162174
}
@@ -282,6 +294,10 @@ struct ProcessPool {
282294
return iter->second;
283295
}
284296

297+
TaskId get_task_id(EventData *task) {
298+
return task->info.fd;
299+
}
300+
285301
void set_max_packet_size(uint32_t _max_packet_size) {
286302
max_packet_size_ = _max_packet_size;
287303
}

include/swoole_server.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ class ThreadFactory : public BaseFactory {
459459
void at_thread_exit(Worker *worker);
460460
void create_message_bus();
461461
void destroy_message_bus();
462+
462463
public:
463464
ThreadFactory(Server *server);
464465
~ThreadFactory();
@@ -996,6 +997,10 @@ class Server {
996997
int get_idle_task_worker_num();
997998
int get_task_count();
998999

1000+
TaskId get_task_id(EventData *task) {
1001+
return gs->task_workers.get_task_id(task);
1002+
}
1003+
9991004
int get_minfd() {
10001005
return gs->min_fd;
10011006
}
@@ -1544,7 +1549,7 @@ class Server {
15441549
uint32_t key = 0;
15451550
SW_LOOP_N(worker_num + 1) {
15461551
key = sw_atomic_fetch_add(&worker_round_id, 1) % worker_num;
1547-
if (workers[key].status == SW_WORKER_IDLE) {
1552+
if (workers[key].is_idle()) {
15481553
found = true;
15491554
break;
15501555
}

src/os/process_pool.cc

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ int ProcessPool::schedule() {
278278

279279
for (i = 0; i < worker_num + 1; i++) {
280280
target_worker_id = sw_atomic_fetch_add(&round_id, 1) % worker_num;
281-
if (workers[target_worker_id].status == SW_WORKER_IDLE) {
281+
if (workers[target_worker_id].is_idle()) {
282282
found = 1;
283283
break;
284284
}
@@ -468,6 +468,7 @@ pid_t ProcessPool::spawn(Worker *worker) {
468468
swoole_set_process_type(SW_PROCESS_WORKER);
469469
swoole_set_process_id(worker->id);
470470
SwooleWG.worker = worker;
471+
SwooleWG.run_always = true;
471472
if (async) {
472473
if (swoole_event_init(SW_EVENTLOOP_WAIT_EXIT) < 0) {
473474
exit(254);
@@ -526,10 +527,7 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker
526527
EventData buf;
527528
} out{};
528529

529-
ssize_t n = 0, ret;
530-
if (pool->get_max_request() <= 0) {
531-
SwooleWG.run_always = 1;
532-
}
530+
ssize_t n = 0;
533531

534532
/**
535533
* Use from_fd save the task_worker->id
@@ -591,11 +589,10 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker
591589
swoole_warning("bad task packet, The received data-length[%ld] is inconsistent with the packet-length[%ld]",
592590
n,
593591
out.buf.info.len + sizeof(out.buf.info));
594-
continue;
592+
} else if (pool->onTask(pool, worker, &out.buf) < 0) {
593+
swoole_warning("[Worker#%d] the execution of task#%ld has failed", worker->id, pool->get_task_id(&out.buf));
595594
}
596595

597-
ret = pool->onTask(pool, worker, &out.buf);
598-
599596
if (pool->use_socket && pool->stream_info_->last_connection) {
600597
int _end = 0;
601598
pool->stream_info_->last_connection->send_blocking((void *) &_end, sizeof(_end));
@@ -611,7 +608,7 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker
611608
}
612609

613610
if (worker->has_exceeded_max_request()) {
614-
break;
611+
break;
615612
}
616613
}
617614
return SW_OK;
@@ -758,7 +755,7 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke
758755
}
759756

760757
if (worker->has_exceeded_max_request()) {
761-
break;
758+
break;
762759
}
763760
}
764761
return SW_OK;
@@ -801,7 +798,7 @@ static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Work
801798
return SW_OK;
802799
}
803800
if (worker->has_exceeded_max_request()) {
804-
break;
801+
break;
805802
}
806803
}
807804

@@ -994,4 +991,37 @@ void ProcessPool::destroy() {
994991
sw_mem_pool()->free(workers);
995992
}
996993

994+
bool Worker::has_exceeded_max_request() {
995+
return !SwooleWG.run_always && request_count >= SwooleWG.max_request;
996+
}
997+
998+
ssize_t Worker::send_pipe_message(const void *buf, size_t n, int flags) {
999+
Socket *pipe_sock;
1000+
1001+
if (flags & SW_PIPE_MASTER) {
1002+
pipe_sock = pipe_master;
1003+
} else {
1004+
pipe_sock = pipe_worker;
1005+
}
1006+
1007+
// message-queue
1008+
if (pool->use_msgqueue) {
1009+
struct {
1010+
long mtype;
1011+
EventData buf;
1012+
} msg;
1013+
1014+
msg.mtype = id + 1;
1015+
memcpy(&msg.buf, buf, n);
1016+
1017+
return pool->queue->push((QueueNode *) &msg, n) ? n : -1;
1018+
}
1019+
1020+
if ((flags & SW_PIPE_NONBLOCK) && swoole_event_is_available()) {
1021+
return swoole_event_write(pipe_sock, buf, n);
1022+
} else {
1023+
return pipe_sock->send_blocking(buf, n);
1024+
}
1025+
}
1026+
9971027
} // namespace swoole

src/server/master.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,7 +2056,7 @@ int Server::get_idle_worker_num() {
20562056

20572057
for (i = 0; i < worker_num; i++) {
20582058
Worker *worker = get_worker(i);
2059-
if (worker->status == SW_WORKER_IDLE) {
2059+
if (worker->is_idle()) {
20602060
idle_worker_num++;
20612061
}
20622062
}
@@ -2069,7 +2069,7 @@ int Server::get_idle_task_worker_num() {
20692069

20702070
for (uint32_t i = worker_num; i < (worker_num + task_worker_num); i++) {
20712071
Worker *worker = get_worker(i);
2072-
if (worker->status == SW_WORKER_IDLE) {
2072+
if (worker->is_idle()) {
20732073
idle_worker_num++;
20742074
}
20752075
}

src/server/task_worker.cc

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task)
8686
Server *serv = (Server *) pool->ptr;
8787
serv->last_task = task;
8888

89-
worker->status = SW_WORKER_BUSY;
89+
worker->set_status_to_busy();
9090
if (task->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) {
9191
serv->onPipeMessage(serv, task);
9292
} else if (task->info.type == SW_SERVER_EVENT_SHUTDOWN) {
@@ -95,9 +95,13 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task)
9595
ret = TaskWorker_call_command_handler(pool, task);
9696
} else {
9797
ret = serv->onTask(serv, task);
98-
worker->request_count++;
98+
/**
99+
* only server task as requests,
100+
* do not increase the count for pipeline communication and command processing.
101+
*/
102+
worker->add_request_count();
99103
}
100-
worker->status = SW_WORKER_IDLE;
104+
worker->set_status_to_idle();
101105

102106
return ret;
103107
}
@@ -202,7 +206,7 @@ static void TaskWorker_onStart(ProcessPool *pool, Worker *worker) {
202206

203207
worker->start_time = ::time(nullptr);
204208
worker->request_count = 0;
205-
worker->status = SW_WORKER_IDLE;
209+
worker->set_status_to_idle();
206210
/**
207211
* task_max_request
208212
*/
@@ -248,7 +252,7 @@ static int TaskWorker_onPipeReceive(Reactor *reactor, Event *event) {
248252
static int TaskWorker_loop_async(ProcessPool *pool, Worker *worker) {
249253
Server *serv = (Server *) pool->ptr;
250254
Socket *socket = worker->pipe_worker;
251-
worker->status = SW_WORKER_IDLE;
255+
worker->set_status_to_idle();
252256

253257
socket->set_nonblock();
254258
sw_reactor()->ptr = pool;
@@ -316,10 +320,10 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even
316320
uint32_t _len = htonl(data_len);
317321
retval = worker->pool->stream_info_->last_connection->send_blocking((void *) &_len, sizeof(_len));
318322
if (retval > 0) {
319-
retval = worker->pool->stream_info_->last_connection->send_blocking(data, data_len);
323+
retval = worker->pool->stream_info_->last_connection->send_blocking(data, data_len);
320324
}
321325
} else {
322-
retval = send_to_worker_from_worker(worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER);
326+
retval = send_to_worker_from_worker(worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER);
323327
}
324328
} else {
325329
uint64_t flag = 1;
@@ -367,7 +371,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even
367371
worker->lock->unlock();
368372

369373
while (1) {
370-
retval = pipe->write(&flag, sizeof(flag));
374+
retval = pipe->write(&flag, sizeof(flag));
371375
auto _sock = pipe->get_socket(true);
372376
if (retval < 0 && _sock->catch_write_error(errno) == SW_WAIT) {
373377
if (_sock->wait_event(-1, SW_EVENT_WRITE) == 0) {

0 commit comments

Comments
 (0)