Skip to content

Commit a95e7ec

Browse files
committed
add tests 9 --filter=[core]
1 parent 35ee873 commit a95e7ec

File tree

3 files changed

+46
-13
lines changed

3 files changed

+46
-13
lines changed

core-tests/src/server/server.cpp

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3002,10 +3002,13 @@ TEST(server, send_timeout) {
30023002

30033003
port->max_idle_time = 1;
30043004

3005+
String wbuf(2 * 1024 * 1024);
3006+
wbuf.append_random_bytes(2 * 1024 * 1024, false);
3007+
30053008
ASSERT_EQ(serv.create(), SW_OK);
30063009

30073010
thread t1;
3008-
serv.onStart = [&lock, &t1](Server *serv) {
3011+
serv.onStart = [&lock, &t1, &wbuf](Server *serv) {
30093012
t1 = thread([=]() {
30103013
swoole_signal_block_all();
30113014

@@ -3016,9 +3019,24 @@ TEST(server, send_timeout) {
30163019
network::SyncClient c(SW_SOCK_TCP);
30173020
c.connect(TEST_HOST, port->port);
30183021
c.send(packet, strlen(packet));
3019-
char buf[1024];
3020-
EXPECT_EQ(c.recv(buf, sizeof(buf)), sizeof(buf));
3021-
EXPECT_EQ(c.recv(buf, sizeof(buf)), 0);
3022+
3023+
String rbuf(3 * 1024 * 1024);
3024+
3025+
auto rn = c.recv(rbuf.str, 1024);
3026+
EXPECT_EQ(rn, 1024);
3027+
rbuf.length += 1024;
3028+
3029+
sleep(2);
3030+
3031+
while (true) {
3032+
rn = c.recv(rbuf.str + rbuf.length, rbuf.size - rbuf.length);
3033+
if (rn <= 0) {
3034+
break;
3035+
}
3036+
rbuf.length += rn;
3037+
}
3038+
3039+
EXPECT_MEMEQ(rbuf.str, wbuf.str, rbuf.length);
30223040
c.close();
30233041

30243042
kill(serv->gs->master_pid, SIGTERM);
@@ -3033,12 +3051,15 @@ TEST(server, send_timeout) {
30333051
DEBUG() << "onWorkerStart: id=" << worker->id << "\n";
30343052
};
30353053

3036-
serv.onReceive = [](Server *serv, RecvData *req) -> int {
3054+
serv.onReceive = [&wbuf](Server *serv, RecvData *req) -> int {
30373055
auto sid = req->session_id();
3038-
auto buf = sw_tg_buffer();
3039-
buf->clear();
3040-
buf->append_random_bytes(1024, false);
3041-
EXPECT_TRUE(serv->send(sid, buf->str, buf->length));
3056+
auto conn = serv->get_connection_by_session_id(sid);
3057+
3058+
swoole_timer_del(conn->socket->recv_timer);
3059+
conn->socket->recv_timer = nullptr;
3060+
conn->socket->set_buffer_size(65536);
3061+
3062+
EXPECT_TRUE(serv->send(sid, wbuf.str, wbuf.length));
30423063

30433064
test::counter_incr(0);
30443065

src/server/master.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ TimerCallback Server::get_timeout_callback(ListenPort *port, Reactor *reactor, C
3535
return;
3636
}
3737
long ms = time<std::chrono::milliseconds>(true);
38+
swoole_trace_log(SW_TRACE_SERVER,
39+
"timeout_callback, last_received_time=%f, last_sent_time=%f",
40+
conn->socket->last_received_time,
41+
conn->socket->last_sent_time);
3842
if (ms - conn->socket->last_received_time < port->max_idle_time &&
3943
ms - conn->socket->last_sent_time < port->max_idle_time) {
4044
return;
@@ -1513,6 +1517,10 @@ int Server::send_to_connection(SendData *_send) {
15131517
_socket->send_timeout_ = port->max_idle_time;
15141518
_socket->last_sent_time = time<std::chrono::milliseconds>(true);
15151519
_socket->send_timer = swoole_timer_add((long) (port->max_idle_time * 1000), true, timeout_callback);
1520+
swoole_trace_log(SW_TRACE_SERVER,
1521+
"added send_timer[id=%ld], port->max_idle_time=%f",
1522+
_socket->send_timer->id,
1523+
port->max_idle_time);
15161524
}
15171525

15181526
if (!_socket->isset_writable_event()) {

src/server/reactor_thread.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,9 @@ static int ReactorThread_onRead(Reactor *reactor, Event *event) {
578578
static int ReactorThread_onWrite(Reactor *reactor, Event *ev) {
579579
int ret;
580580
auto serv = static_cast<Server *>(reactor->ptr);
581-
Socket *socket = ev->socket;
581+
auto socket = ev->socket;
582582
int fd = ev->fd;
583+
auto port = serv->get_port_by_fd(fd);
583584

584585
if (serv->is_process_mode()) {
585586
assert(fd % serv->reactor_num == reactor->id);
@@ -639,16 +640,19 @@ static int ReactorThread_onWrite(Reactor *reactor, Event *ev) {
639640
}
640641

641642
if (serv->onBufferEmpty && conn->high_watermark) {
642-
ListenPort *port = serv->get_port_by_fd(fd);
643643
if (socket->get_out_buffer_length() <= port->buffer_low_watermark) {
644644
conn->high_watermark = 0;
645645
serv->notify(conn, SW_SERVER_EVENT_BUFFER_EMPTY);
646646
}
647647
}
648648

649649
if (socket->send_timer) {
650-
swoole_timer_del(socket->send_timer);
651-
socket->send_timer = nullptr;
650+
if (Buffer::empty(socket->out_buffer)) {
651+
swoole_timer_del(socket->send_timer);
652+
socket->send_timer = nullptr;
653+
} else {
654+
swoole_timer_delay(socket->send_timer, port->max_idle_time);
655+
}
652656
}
653657

654658
// remove EPOLLOUT event

0 commit comments

Comments
 (0)