Skip to content

Commit 7cd6dd2

Browse files
committed
fix tests [3]
1 parent 90a1190 commit 7cd6dd2

File tree

3 files changed

+87
-21
lines changed

3 files changed

+87
-21
lines changed

core-tests/src/os/process_pool.cpp

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,36 +11,72 @@ using namespace swoole;
1111

1212
static void test_func(ProcessPool &pool) {
1313
EventData data{};
14-
data.info.len = strlen(TEST_JPG_MD5SUM);
15-
memcpy(data.data, TEST_JPG_MD5SUM, data.info.len);
14+
size_t size = swoole_system_random(1024, 4096);
15+
String rmem(size);
16+
rmem.append_random_bytes(size - 1);
17+
rmem.append("\0");
18+
19+
data.info.len = size;
20+
memcpy(data.data, rmem.value(), size);
1621

1722
int worker_id = -1;
1823
ASSERT_EQ(pool.dispatch_blocking(&data, &worker_id), SW_OK);
1924

2025
pool.running = true;
26+
pool.ptr = &rmem;
27+
SwooleWG.run_always = true;
28+
pool.main_loop(&pool, pool.get_worker(0));
29+
pool.destroy();
30+
}
31+
32+
static void test_func_task_protocol(ProcessPool &pool) {
33+
pool.set_protocol(SW_PROTOCOL_TASK);
2134
pool.onTask = [](ProcessPool *pool, Worker *worker, EventData *task) -> int {
2235
pool->running = false;
36+
usleep(10000);
2337
EXPECT_MEMEQ(task->data, TEST_JPG_MD5SUM, task->info.len);
2438
return 0;
2539
};
26-
pool.main_loop(&pool, pool.get_worker(0));
27-
pool.destroy();
40+
test_func(pool);
41+
}
42+
43+
static void test_func_message_protocol(ProcessPool &pool) {
44+
pool.set_protocol(SW_PROTOCOL_MESSAGE);
45+
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
46+
pool->running = false;
47+
String *_data = (String *) pool->ptr;
48+
usleep(10000);
49+
EXPECT_MEMEQ(_data->str, rdata->data, rdata->info.len);
50+
};
51+
test_func(pool);
52+
}
53+
54+
static void test_func_stream_protocol(ProcessPool &pool) {
55+
pool.set_protocol(SW_PROTOCOL_STREAM);
56+
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
57+
pool->running = false;
58+
String *_data = (String *) pool->ptr;
59+
EventData *msg = (EventData *) rdata->data;
60+
usleep(10000);
61+
EXPECT_MEMEQ(_data->str, msg->data, msg->len());
62+
};
63+
test_func(pool);
2864
}
2965

3066
TEST(process_pool, tcp) {
3167
ProcessPool pool{};
3268
ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK);
3369
ASSERT_EQ(pool.listen(TEST_HOST, TEST_PORT, 128), SW_OK);
3470

35-
test_func(pool);
71+
test_func_task_protocol(pool);
3672
}
3773

3874
TEST(process_pool, unix_sock) {
3975
ProcessPool pool{};
4076
signal(SIGPIPE, SIG_IGN);
4177
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);
4278

43-
test_func(pool);
79+
test_func_task_protocol(pool);
4480
}
4581

4682
TEST(process_pool, tcp_raw) {
@@ -72,7 +108,21 @@ TEST(process_pool, msgqueue) {
72108
ProcessPool pool{};
73109
ASSERT_EQ(pool.create(1, 0x9501, SW_IPC_MSGQUEUE), SW_OK);
74110

75-
test_func(pool);
111+
test_func_task_protocol(pool);
112+
}
113+
114+
TEST(process_pool, message_protocol) {
115+
ProcessPool pool{};
116+
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);
117+
118+
test_func_message_protocol(pool);
119+
}
120+
121+
TEST(process_pool, stream_protocol) {
122+
ProcessPool pool{};
123+
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);
124+
125+
test_func_stream_protocol(pool);
76126
}
77127

78128
constexpr int magic_number = 99900011;

include/swoole_process_pool.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,14 @@ struct ProcessPool {
344344
int listen(const char *host, int port, int blacklog);
345345
int schedule();
346346
bool is_worker_running(Worker *worker);
347+
347348
static void kill_timeout_worker(Timer *timer, TimerNode *tnode);
349+
350+
private:
351+
static int run_with_task_protocol(ProcessPool *pool, Worker *worker);
352+
static int run_with_stream_protocol(ProcessPool *pool, Worker *worker);
353+
static int run_with_message_protocol(ProcessPool *pool, Worker *worker);
354+
static int run_async(ProcessPool *pool, Worker *worker);
348355
};
349356
}; // namespace swoole
350357

src/os/process_pool.cc

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@ namespace swoole {
3232
using network::Socket;
3333
using network::Stream;
3434

35-
static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker *worker);
36-
static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worker *worker);
37-
static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Worker *worker);
38-
static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker);
39-
4035
void ProcessPool::kill_timeout_worker(Timer *timer, TimerNode *tnode) {
4136
uint32_t i;
4237
pid_t reload_worker_pid = 0;
@@ -113,7 +108,7 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPCMode _ip
113108

114109
map_ = new std::unordered_map<pid_t, Worker *>;
115110
ipc_mode = _ipc_mode;
116-
main_loop = ProcessPool_worker_loop_with_task_protocol;
111+
main_loop = run_with_task_protocol;
117112
protocol_type_ = SW_PROTOCOL_TASK;
118113
max_packet_size_ = SW_INPUT_BUFFER_SIZE;
119114

@@ -207,13 +202,13 @@ int ProcessPool::listen(const char *host, int port, int blacklog) {
207202
void ProcessPool::set_protocol(enum ProtocolType _protocol_type) {
208203
switch (_protocol_type) {
209204
case SW_PROTOCOL_TASK:
210-
main_loop = ProcessPool_worker_loop_with_task_protocol;
205+
main_loop = run_with_task_protocol;
211206
break;
212207
case SW_PROTOCOL_STREAM:
213-
main_loop = ProcessPool_worker_loop_with_stream_protocol;
208+
main_loop = run_with_stream_protocol;
214209
break;
215210
case SW_PROTOCOL_MESSAGE:
216-
main_loop = ProcessPool_worker_loop_with_message_protocol;
211+
main_loop = run_with_message_protocol;
217212
break;
218213
default:
219214
abort();
@@ -234,7 +229,7 @@ int ProcessPool::start_check() {
234229
swoole_set_process_type(SW_PROCESS_MASTER);
235230

236231
if (async) {
237-
main_loop = ProcessPool_worker_loop_async;
232+
main_loop = run_async;
238233
}
239234

240235
SW_LOOP_N(worker_num) {
@@ -507,7 +502,7 @@ bool ProcessPool::is_worker_running(Worker *worker) {
507502
return running && !SwooleWG.shutdown && !worker->has_exceeded_max_request();
508503
}
509504

510-
static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker *worker) {
505+
int ProcessPool::run_with_task_protocol(ProcessPool *pool, Worker *worker) {
511506
struct {
512507
long mtype;
513508
EventData buf;
@@ -626,7 +621,7 @@ static int ProcessPool_recv_message(Reactor *reactor, Event *event) {
626621
return SW_OK;
627622
}
628623

629-
static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker) {
624+
int ProcessPool::run_async(ProcessPool *pool, Worker *worker) {
630625
if (pool->ipc_mode == SW_IPC_UNIXSOCK && pool->onMessage) {
631626
swoole_event_add(worker->pipe_worker, SW_EVENT_READ);
632627
if (pool->message_bus) {
@@ -642,7 +637,7 @@ static int ProcessPool_worker_loop_async(ProcessPool *pool, Worker *worker) {
642637
return swoole_event_wait();
643638
}
644639

645-
static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worker *worker) {
640+
int ProcessPool::run_with_stream_protocol(ProcessPool *pool, Worker *worker) {
646641
ssize_t n;
647642
RecvData msg{};
648643
msg.info.reactor_id = -1;
@@ -652,6 +647,10 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke
652647
pool->stream_info_->response_buffer = new String(SW_BUFFER_SIZE_STD);
653648
}
654649

650+
if (pool->ipc_mode == SW_IPC_UNIXSOCK && pool->message_bus == nullptr) {
651+
pool->create_message_bus();
652+
}
653+
655654
QueueNode *outbuf = (QueueNode *) pool->packet_buffer;
656655
outbuf->mtype = 0;
657656

@@ -739,7 +738,7 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke
739738
return SW_OK;
740739
}
741740

742-
static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Worker *worker) {
741+
int ProcessPool::run_with_message_protocol(ProcessPool *pool, Worker *worker) {
743742
auto fn = [&]() -> int {
744743
if (worker->pipe_worker->wait_event(-1, SW_EVENT_READ) < 0) {
745744
return errno == EINTR ? 0 : -1;
@@ -758,6 +757,16 @@ static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Work
758757
return 1;
759758
};
760759

760+
if (pool->ipc_mode != SW_IPC_UNIXSOCK) {
761+
swoole_error_log(
762+
SW_LOG_WARNING, SW_ERROR_OPERATION_NOT_SUPPORT, "not support, ipc_mode must be SW_IPC_UNIXSOCK");
763+
return SW_ERR;
764+
}
765+
766+
if (pool->message_bus == nullptr) {
767+
pool->create_message_bus();
768+
}
769+
761770
worker->pipe_worker->dont_restart = 1;
762771

763772
while (pool->is_worker_running(worker)) {

0 commit comments

Comments
 (0)