Skip to content

Commit bae2e7e

Browse files
committed
Fix tests [16] --filter=[core] --verbose
1 parent 5a9b01a commit bae2e7e

File tree

9 files changed

+244
-7
lines changed

9 files changed

+244
-7
lines changed

core-tests/src/os/process_pool.cpp

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,10 +559,57 @@ TEST(process_pool, listen_unixsock) {
559559
t1.join();
560560
}
561561

562-
TEST(process_pool, max_request_grace) {
562+
TEST(process_pool, worker) {
563563
Worker worker{};
564+
worker.init();
565+
566+
ASSERT_TRUE(worker.is_running());
567+
ASSERT_GT(worker.start_time, 0);
564568
worker.set_max_request(1000, 200);
565569

566570
ASSERT_GT(SwooleWG.max_request, 1000);
567571
ASSERT_LE(SwooleWG.max_request, 1200);
572+
573+
worker.shutdown();
574+
ASSERT_TRUE(worker.is_shutdown());
575+
576+
swoole_set_worker_type(SW_USER_WORKER);
577+
ASSERT_EQ(swoole_get_worker_symbol(), '@');
578+
579+
swoole_set_worker_type(SW_TASK_WORKER);
580+
ASSERT_EQ(swoole_get_worker_symbol(), '^');
581+
582+
swoole_set_worker_type(SW_WORKER);
583+
ASSERT_EQ(swoole_get_worker_symbol(), '*');
584+
585+
swoole_set_worker_type(SW_MASTER);
586+
ASSERT_EQ(swoole_get_worker_symbol(), '#');
587+
588+
swoole_set_worker_type(SW_MANAGER);
589+
ASSERT_EQ(swoole_get_worker_symbol(), '$');
590+
591+
worker.set_status_to_idle();
592+
ASSERT_TRUE(worker.is_idle());
593+
ASSERT_FALSE(worker.is_busy());
594+
595+
worker.set_status_to_busy();
596+
ASSERT_FALSE(worker.is_idle());
597+
ASSERT_TRUE(worker.is_busy());
598+
599+
worker.set_status(SW_WORKER_EXIT);
600+
ASSERT_FALSE(worker.is_idle());
601+
ASSERT_FALSE(worker.is_busy());
602+
}
603+
604+
TEST(process_pool, add_worker) {
605+
Worker worker{};
606+
worker.pid = getpid();
607+
608+
ProcessPool pool{};
609+
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);
610+
611+
pool.add_worker(&worker);
612+
613+
auto *worker2 = pool.get_worker_by_pid(getpid());
614+
ASSERT_EQ(&worker, worker2);
568615
}

core-tests/src/server/server.cpp

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2223,3 +2223,175 @@ TEST(server, eof_check) {
22232223
ASSERT_TRUE(flags["on_close"]);
22242224
ASSERT_EQ(recv_count, 3);
22252225
}
2226+
2227+
static void test_clean_worker(Server::Mode mode) {
2228+
Server serv(mode);
2229+
serv.worker_num = 2;
2230+
2231+
int *counter = (int *) sw_mem_pool()->alloc(sizeof(int) * 4);
2232+
2233+
AsyncClient ac(SW_SOCK_TCP);
2234+
2235+
ListenPort *port = serv.add_port(SW_SOCK_TCP, TEST_HOST, 0);
2236+
ASSERT_TRUE(port);
2237+
2238+
ASSERT_EQ(serv.create(), SW_OK);
2239+
2240+
serv.onConnect = [](Server *serv, DataHead *ev) {
2241+
swoole_event_defer(
2242+
[serv](void *) {
2243+
serv->clean_worker_connections(sw_worker());
2244+
sw_reactor()->running = false;
2245+
},
2246+
nullptr);
2247+
};
2248+
2249+
serv.onReceive = [counter](Server *serv, RecvData *req) {
2250+
serv->send(req->info.fd, "OK", 2);
2251+
sw_atomic_fetch_add(&counter[0], 1);
2252+
return SW_OK;
2253+
};
2254+
2255+
serv.onClose = [counter](Server *serv, DataHead *ev) { sw_atomic_fetch_add(&counter[2], 1); };
2256+
2257+
serv.onWorkerStart = [counter](Server *serv, Worker *worker) {
2258+
ASSERT_EQ(serv->get_connection_num(), 0);
2259+
sw_atomic_fetch_add(&counter[1], 1);
2260+
};
2261+
2262+
serv.onStart = [port, &ac, counter](Server *_serv) {
2263+
swoole_timer_after(50, [port, _serv, &ac, counter](TIMER_PARAMS) {
2264+
ac.on_connect([&](AsyncClient *ac) { ac->send(SW_STRL(TEST_STR)); });
2265+
2266+
ac.on_close(
2267+
[_serv](AsyncClient *ac) { swoole_timer_after(50, [_serv](TIMER_PARAMS) { _serv->shutdown(); }); });
2268+
2269+
ac.on_error([](AsyncClient *ac) {});
2270+
2271+
ac.on_receive(
2272+
[counter](AsyncClient *ac, const char *data, size_t len) { sw_atomic_fetch_add(&counter[3], 1); });
2273+
2274+
bool retval = ac.connect(TEST_HOST, port->get_port());
2275+
EXPECT_TRUE(retval);
2276+
});
2277+
};
2278+
2279+
ASSERT_EQ(serv.start(), SW_OK);
2280+
ASSERT_EQ(counter[0], 0); // Server on_receive
2281+
ASSERT_EQ(counter[1], 3); // worker start
2282+
ASSERT_EQ(counter[2], 1); // Server on_close
2283+
ASSERT_EQ(counter[3], 0); // Client on_receive
2284+
}
2285+
2286+
TEST(server, clean_worker_1) {
2287+
test_clean_worker(Server::MODE_BASE);
2288+
}
2289+
2290+
TEST(server, clean_worker_2) {
2291+
test_clean_worker(Server::MODE_THREAD);
2292+
}
2293+
2294+
static void test_kill_worker(Server::Mode mode, bool wait_reactor = true) {
2295+
Server serv(mode);
2296+
serv.worker_num = 2;
2297+
2298+
int *counter = (int *) sw_mem_pool()->alloc(sizeof(int) * 6);
2299+
2300+
swoole::Mutex lock(swoole::Mutex::PROCESS_SHARED);
2301+
lock.lock();
2302+
2303+
ListenPort *port = serv.add_port(SW_SOCK_TCP, TEST_HOST, 0);
2304+
ASSERT_TRUE(port);
2305+
2306+
ASSERT_EQ(serv.create(), SW_OK);
2307+
2308+
serv.onConnect = [counter](Server *serv, DataHead *ev) {
2309+
counter[4] = ev->fd;
2310+
counter[5] = sw_worker()->id;
2311+
};
2312+
2313+
serv.onReceive = [counter](Server *serv, RecvData *req) {
2314+
serv->send(req->info.fd, "OK", 2);
2315+
sw_atomic_fetch_add(&counter[0], 1);
2316+
2317+
return SW_OK;
2318+
};
2319+
2320+
serv.onWorkerStop = [counter](Server *_serv, Worker *worker) {
2321+
_serv->close(counter[4]);
2322+
_serv->drain_worker_pipe();
2323+
};
2324+
2325+
serv.onClose = [counter](Server *serv, DataHead *ev) {
2326+
sw_atomic_fetch_add(&counter[2], 1);
2327+
};
2328+
2329+
serv.onWorkerStart = [counter](Server *_serv, Worker *worker) { sw_atomic_fetch_add(&counter[1], 1); };
2330+
2331+
serv.onStart = [&lock](Server *_serv) {
2332+
if (!sw_worker()) {
2333+
ASSERT_FALSE(_serv->kill_worker(-1, true));
2334+
}
2335+
lock.unlock();
2336+
};
2337+
2338+
std::thread t([&]() {
2339+
swoole_signal_block_all();
2340+
2341+
lock.lock();
2342+
2343+
usleep(50000);
2344+
2345+
network::SyncClient c(SW_SOCK_TCP);
2346+
EXPECT_TRUE(c.connect(TEST_HOST, port->port));
2347+
2348+
EXPECT_EQ(c.send(SW_STRL(TEST_STR)), strlen(TEST_STR));
2349+
2350+
String rbuf(1024);
2351+
auto rn = c.recv(rbuf.str, rbuf.size);
2352+
EXPECT_EQ(rn, 2);
2353+
2354+
serv.kill_worker(1 - counter[5], wait_reactor);
2355+
2356+
rn = c.recv(rbuf.str, rbuf.size);
2357+
EXPECT_EQ(rn, 0);
2358+
2359+
sw_atomic_fetch_add(&counter[3], 1);
2360+
2361+
usleep(50000);
2362+
2363+
serv.shutdown();
2364+
});
2365+
2366+
ASSERT_EQ(serv.start(), SW_OK);
2367+
t.join();
2368+
2369+
ASSERT_EQ(counter[0], 1); // Client receive
2370+
ASSERT_EQ(counter[1], 3); // Server onWorkeStart
2371+
ASSERT_EQ(counter[2], 1); // Server onClose
2372+
ASSERT_EQ(counter[3], 1); // Client close
2373+
}
2374+
2375+
TEST(server, kill_worker_1) {
2376+
test_kill_worker(Server::MODE_BASE);
2377+
}
2378+
2379+
TEST(server, kill_worker_2) {
2380+
test_kill_worker(Server::MODE_PROCESS);
2381+
}
2382+
2383+
TEST(server, kill_worker_3) {
2384+
test_kill_worker(Server::MODE_THREAD);
2385+
}
2386+
2387+
TEST(server, kill_worker_4) {
2388+
test_kill_worker(Server::MODE_BASE, false);
2389+
}
2390+
2391+
TEST(server, kill_worker_5) {
2392+
test_kill_worker(Server::MODE_PROCESS, false);
2393+
}
2394+
2395+
TEST(server, kill_worker_6) {
2396+
test_kill_worker(Server::MODE_THREAD, false);
2397+
}

ext-src/swoole_server.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3912,8 +3912,6 @@ static PHP_METHOD(swoole_server, stop) {
39123912
Z_PARAM_BOOL(wait_reactor)
39133913
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);
39143914

3915-
worker_id = worker_id < 0 ? swoole_get_worker_id() : worker_id;
3916-
39173915
RETURN_BOOL(serv->kill_worker(worker_id, wait_reactor));
39183916
}
39193917

include/swoole_server.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ enum ServerEventType {
632632
SW_SERVER_EVENT_CLOSE,
633633
SW_SERVER_EVENT_CONNECT,
634634
SW_SERVER_EVENT_CLOSE_FORCE,
635+
SW_SERVER_EVENT_CLOSE_FORWARD,
635636
// task
636637
SW_SERVER_EVENT_TASK,
637638
SW_SERVER_EVENT_FINISH,
@@ -1293,7 +1294,7 @@ class Server {
12931294
}
12941295

12951296
Worker *get_worker(uint16_t worker_id);
1296-
bool kill_worker(WorkerId worker_id, bool wait_reactor);
1297+
bool kill_worker(int worker_id, bool wait_reactor);
12971298
void stop_async_worker(Worker *worker);
12981299

12991300
Pipe *get_pipe_object(int pipe_fd) {

src/core/log.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,9 @@ void Logger::put(int level, const char *content, size_t length) {
309309

310310
int worker_id = swoole_get_worker_id();
311311
pid_t worker_pid = swoole_get_worker_pid();
312+
if (worker_pid == 0) {
313+
worker_pid = getpid();
314+
}
312315
char worker_symbol = swoole_get_worker_symbol();
313316

314317
size_t n = sw_snprintf(log_str,

src/server/base.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ bool BaseFactory::end(SessionId session_id, int flags) {
143143
session_id,
144144
session->fd,
145145
session->reactor_id);
146+
_send.info.type = SW_SERVER_EVENT_CLOSE_FORWARD;
146147
return forward_message(session, &_send);
147148
}
148149

src/server/reactor_process.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ static int ReactorProcess_onPipeRead(Reactor *reactor, Event *event) {
127127
factory->finish(&_send);
128128
break;
129129
}
130-
case SW_SERVER_EVENT_CLOSE: {
130+
case SW_SERVER_EVENT_CLOSE:
131+
case SW_SERVER_EVENT_CLOSE_FORWARD: {
131132
factory->end(pipe_buffer->info.fd, Server::CLOSE_ACTIVELY);
132133
break;
133134
}

src/server/reactor_thread.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ static int ReactorThread_onPipeRead(Reactor *reactor, Event *ev) {
418418
serv->onPipeMessage(serv, (EventData *) resp);
419419
} else if (resp->info.type == SW_SERVER_EVENT_CLOSE_FORCE) {
420420
thread->close_connection(reactor, resp->info.fd);
421+
} else if (resp->info.type == SW_SERVER_EVENT_CLOSE_FORWARD) {
422+
serv->factory->end(resp->info.fd, Server::CLOSE_ACTIVELY);
421423
} else {
422424
PacketPtr packet = thread->message_bus.get_packet();
423425
_send.info = resp->info;

src/server/worker.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,14 +341,25 @@ void Server::call_worker_error_callback(Worker *worker, const ExitStatus &status
341341
}
342342
}
343343

344-
bool Server::kill_worker(WorkerId worker_id, bool wait_reactor) {
344+
bool Server::kill_worker(int worker_id, bool wait_reactor) {
345+
auto current_worker = sw_worker();
346+
if (!current_worker && worker_id < 0) {
347+
swoole_error_log(
348+
SW_LOG_WARNING, SW_ERROR_WRONG_OPERATION, "kill worker in non worker process requires specifying an id");
349+
return false;
350+
}
351+
352+
worker_id = worker_id < 0 ? swoole_get_worker_id() : worker_id;
353+
354+
swoole_trace_log(SW_TRACE_SERVER, "kill worker#%d", worker_id);
355+
345356
if (is_thread_mode()) {
346357
DataHead event = {};
347358
event.type = SW_SERVER_EVENT_SHUTDOWN;
348359
return send_to_worker_from_worker(get_worker(worker_id), &event, sizeof(event), SW_PIPE_MASTER) != -1;
349360
}
350361

351-
if (worker_id == sw_worker()->id && !wait_reactor) {
362+
if (current_worker && worker_id == current_worker->id && !wait_reactor) {
352363
if (swoole_event_is_available()) {
353364
swoole_event_defer([](void *data) { sw_reactor()->running = false; }, nullptr);
354365
}
@@ -513,6 +524,7 @@ void Server::drain_worker_pipe() {
513524
}
514525

515526
void Server::clean_worker_connections(Worker *worker) {
527+
swoole_trace_log(SW_TRACE_WORKER, "clean connections");
516528
sw_reactor()->destroyed = true;
517529
if (sw_likely(is_base_mode())) {
518530
foreach_connection([this](Connection *conn) { close(conn->session_id, true); });

0 commit comments

Comments
 (0)