Skip to content

Commit 7e30a33

Browse files
authored
Optimize server code (#5478)
* Optimize code * Optimize code * fix tests * optimize code * fix tests * fix tests * fix tests [3] * fix tests [4] * fix tests [5]
1 parent 81fd4e8 commit 7e30a33

14 files changed

+325
-250
lines changed

core-tests/src/os/process_pool.cpp

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,36 +11,73 @@ using namespace swoole;
1111

1212
static void test_func(ProcessPool &pool) {
1313
EventData data{};
14-
data.info.len = strlen(TEST_JPG_MD5SUM);
15-
memcpy(data.data, TEST_JPG_MD5SUM, data.info.len);
14+
size_t size = swoole_system_random(1024, 4096);
15+
String rmem(size);
16+
rmem.append_random_bytes(size - 1);
17+
rmem.append("\0");
18+
19+
data.info.len = size;
20+
memcpy(data.data, rmem.value(), size);
1621

1722
int worker_id = -1;
1823
ASSERT_EQ(pool.dispatch_blocking(&data, &worker_id), SW_OK);
1924

2025
pool.running = true;
26+
pool.ptr = &rmem;
27+
SwooleWG.run_always = true;
28+
pool.main_loop(&pool, pool.get_worker(0));
29+
pool.destroy();
30+
}
31+
32+
static void test_func_task_protocol(ProcessPool &pool) {
33+
pool.set_protocol(SW_PROTOCOL_TASK);
2134
pool.onTask = [](ProcessPool *pool, Worker *worker, EventData *task) -> int {
2235
pool->running = false;
23-
EXPECT_MEMEQ(task->data, TEST_JPG_MD5SUM, task->info.len);
36+
String *_data = (String *) pool->ptr;
37+
usleep(10000);
38+
EXPECT_MEMEQ(_data->str, task->data, task->len());
2439
return 0;
2540
};
26-
pool.main_loop(&pool, pool.get_worker(0));
27-
pool.destroy();
41+
test_func(pool);
42+
}
43+
44+
static void test_func_message_protocol(ProcessPool &pool) {
45+
pool.set_protocol(SW_PROTOCOL_MESSAGE);
46+
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
47+
pool->running = false;
48+
String *_data = (String *) pool->ptr;
49+
usleep(10000);
50+
EXPECT_MEMEQ(_data->str, rdata->data, rdata->info.len);
51+
};
52+
test_func(pool);
53+
}
54+
55+
static void test_func_stream_protocol(ProcessPool &pool) {
56+
pool.set_protocol(SW_PROTOCOL_STREAM);
57+
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
58+
pool->running = false;
59+
String *_data = (String *) pool->ptr;
60+
EventData *msg = (EventData *) rdata->data;
61+
usleep(10000);
62+
EXPECT_MEMEQ(_data->str, msg->data, msg->len());
63+
};
64+
test_func(pool);
2865
}
2966

3067
TEST(process_pool, tcp) {
3168
ProcessPool pool{};
3269
ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK);
3370
ASSERT_EQ(pool.listen(TEST_HOST, TEST_PORT, 128), SW_OK);
3471

35-
test_func(pool);
72+
test_func_task_protocol(pool);
3673
}
3774

3875
TEST(process_pool, unix_sock) {
3976
ProcessPool pool{};
4077
signal(SIGPIPE, SIG_IGN);
4178
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);
4279

43-
test_func(pool);
80+
test_func_task_protocol(pool);
4481
}
4582

4683
TEST(process_pool, tcp_raw) {
@@ -72,7 +109,21 @@ TEST(process_pool, msgqueue) {
72109
ProcessPool pool{};
73110
ASSERT_EQ(pool.create(1, 0x9501, SW_IPC_MSGQUEUE), SW_OK);
74111

75-
test_func(pool);
112+
test_func_task_protocol(pool);
113+
}
114+
115+
TEST(process_pool, message_protocol) {
116+
ProcessPool pool{};
117+
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);
118+
119+
test_func_message_protocol(pool);
120+
}
121+
122+
TEST(process_pool, stream_protocol) {
123+
ProcessPool pool{};
124+
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);
125+
126+
test_func_stream_protocol(pool);
76127
}
77128

78129
constexpr int magic_number = 99900011;

core-tests/src/server/server.cpp

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -545,24 +545,25 @@ TEST(server, task_worker) {
545545
exit(2);
546546
}
547547

548-
serv.onTask = [](Server *serv, swEventData *task) -> int {
549-
EXPECT_EQ(serv->get_task_count(), 1);
548+
serv.onTask = [](Server *serv, EventData *task) -> int {
549+
EXPECT_EQ(serv->get_tasking_num(), 1);
550550
EXPECT_EQ(string(task->data, task->info.len), string(packet));
551551
serv->gs->task_workers.running = 0;
552+
serv->gs->task_count++;
553+
serv->gs->tasking_num--;
552554
return 0;
553555
};
554556

555557
ASSERT_EQ(serv.create(), SW_OK);
556558
ASSERT_EQ(serv.create_task_workers(), SW_OK);
557559

558560
thread t1([&serv]() {
561+
SwooleWG.run_always = true;
559562
serv.gs->task_workers.running = 1;
560-
serv.gs->tasking_num++;
561563
serv.gs->task_workers.main_loop(&serv.gs->task_workers, &serv.gs->task_workers.workers[0]);
564+
EXPECT_EQ(serv.get_tasking_num(), 0);
562565
serv.gs->tasking_num--;
563-
EXPECT_EQ(serv.get_task_count(), 0);
564-
serv.gs->tasking_num--;
565-
EXPECT_EQ(serv.get_task_count(), 0);
566+
EXPECT_EQ(serv.get_tasking_num(), 0);
566567
EXPECT_EQ(serv.get_idle_task_worker_num(), serv.task_worker_num);
567568
});
568569

@@ -577,10 +578,13 @@ TEST(server, task_worker) {
577578

578579
int _dst_worker_id = 0;
579580

580-
ASSERT_GE(serv.gs->task_workers.dispatch(&buf, &_dst_worker_id), 0);
581+
ASSERT_TRUE(serv.task(&buf, &_dst_worker_id));
582+
ASSERT_EQ(serv.gs->task_count, 1);
581583

582584
t1.join();
583585
serv.gs->task_workers.destroy();
586+
587+
ASSERT_EQ(serv.gs->task_count, 2);
584588
}
585589

586590
// PHP_METHOD(swoole_server, task)
@@ -600,8 +604,7 @@ TEST(server, task_worker2) {
600604

601605
serv.onTask = [](Server *serv, swEventData *task) -> int {
602606
EXPECT_EQ(string(task->data, task->info.len), string(packet));
603-
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
604-
EXPECT_GT(ret, 0);
607+
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
605608
return 0;
606609
};
607610

@@ -623,7 +626,7 @@ TEST(server, task_worker2) {
623626
memcpy(buf.data, packet, strlen(packet));
624627
buf.info.reactor_id = worker->id;
625628
buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_CALLBACK);
626-
ASSERT_GE(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), 0);
629+
ASSERT_EQ(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), SW_OK);
627630
sleep(1);
628631
kill(serv->gs->master_pid, SIGTERM);
629632
}
@@ -649,8 +652,7 @@ TEST(server, task_worker3) {
649652

650653
serv.onTask = [](Server *serv, swEventData *task) -> int {
651654
EXPECT_EQ(string(task->data, task->info.len), string(packet));
652-
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
653-
EXPECT_GT(ret, 0);
655+
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
654656
return 0;
655657
};
656658

@@ -698,8 +700,7 @@ TEST(server, task_worker4) {
698700

699701
serv.onTask = [](Server *serv, swEventData *task) -> int {
700702
EXPECT_EQ(string(task->data, task->info.len), string(packet));
701-
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
702-
EXPECT_GT(ret, 0);
703+
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
703704
return 0;
704705
};
705706

@@ -724,7 +725,7 @@ TEST(server, task_worker4) {
724725
serv->gs->task_workers.dispatch(&buf, &_dst_worker_id);
725726
sleep(1);
726727

727-
EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
728+
EventData *task_result = serv->get_task_result();
728729
sw_memset_zero(task_result, sizeof(*task_result));
729730
memset(&buf.info, 0, sizeof(buf.info));
730731
buf.info.len = strlen(packet);
@@ -767,8 +768,7 @@ TEST(server, task_worker5) {
767768
ifs.close();
768769

769770
EXPECT_EQ(string(resp), string(data));
770-
int ret = serv->reply_task_result(resp, SW_IPC_MAX_SIZE * 2, 0, task);
771-
EXPECT_GT(ret, 0);
771+
EXPECT_TRUE(serv->finish(resp, SW_IPC_MAX_SIZE * 2, 0, task));
772772
return 0;
773773
};
774774

@@ -779,7 +779,7 @@ TEST(server, task_worker5) {
779779
if (worker->id == 1) {
780780
int _dst_worker_id = 0;
781781

782-
EventData *task_result = &(serv->task_result[worker->id]);
782+
EventData *task_result = &(serv->task_results[worker->id]);
783783
sw_memset_zero(task_result, sizeof(*task_result));
784784

785785
File fp = make_tmpfile();

ext-src/php_swoole_server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ struct ServerObject {
124124

125125
struct TaskCo {
126126
Coroutine *co;
127-
int *list;
127+
TaskId *list;
128128
uint32_t count;
129129
zval *result;
130130
};

ext-src/swoole_process_pool.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,9 @@ static PHP_METHOD(swoole_process_pool, write) {
430430
char *data;
431431
size_t length;
432432

433-
if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &data, &length) == FAILURE) {
434-
RETURN_FALSE;
435-
}
433+
ZEND_PARSE_PARAMETERS_START(1, 1)
434+
Z_PARAM_STRING(data, length)
435+
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);
436436

437437
ProcessPool *pool = process_pool_get_and_check_pool(ZEND_THIS);
438438
if (pool->ipc_mode != SW_IPC_SOCKET) {

0 commit comments

Comments
 (0)