@@ -3073,3 +3073,120 @@ TEST(server, send_timeout) {
3073
3073
ASSERT_EQ (counter[0 ], 1 );
3074
3074
ASSERT_EQ (counter[3 ], 1 );
3075
3075
}
3076
+
3077
+ TEST (server, max_request) {
3078
+ Server serv (Server::MODE_PROCESS);
3079
+ serv.worker_num = 2 ;
3080
+ serv.max_request = 128 ;
3081
+
3082
+ Mutex *lock = new Mutex (Mutex::PROCESS_SHARED);
3083
+ lock->lock ();
3084
+
3085
+ ASSERT_NE (serv.add_port (SW_SOCK_TCP, TEST_HOST, 0 ), nullptr );
3086
+ ASSERT_EQ (serv.create (), SW_OK);
3087
+
3088
+ thread t1;
3089
+ serv.onStart = [&lock, &t1](Server *serv) {
3090
+ t1 = thread ([=]() {
3091
+ swoole_signal_block_all ();
3092
+ lock->lock ();
3093
+ ListenPort *port = serv->get_primary_port ();
3094
+
3095
+ network::SyncClient c (SW_SOCK_TCP);
3096
+ c.connect (TEST_HOST, port->port );
3097
+
3098
+ SW_LOOP_N (1024 ) {
3099
+ c.send (packet, strlen (packet));
3100
+ usleep (1000 );
3101
+ }
3102
+
3103
+ sleep (1 );
3104
+ c.close ();
3105
+ serv->shutdown ();
3106
+ });
3107
+ };
3108
+
3109
+ serv.onWorkerStart = [&lock](Server *serv, Worker *worker) {
3110
+ lock->unlock ();
3111
+ test::counter_incr (0 );
3112
+ };
3113
+
3114
+ serv.onReceive = [](Server *serv, RecvData *req) -> int { return SW_OK; };
3115
+
3116
+ ASSERT_EQ (serv.start (), 0 );
3117
+
3118
+ t1.join ();
3119
+ delete lock;
3120
+
3121
+ ASSERT_GE (test::counter_get (0 ), 8 );
3122
+ }
3123
+
3124
+ TEST (server, watermark) {
3125
+ Server serv (Server::MODE_PROCESS);
3126
+ serv.worker_num = 2 ;
3127
+
3128
+ Mutex *lock = new Mutex (Mutex::PROCESS_SHARED);
3129
+ lock->lock ();
3130
+
3131
+ String wbuf;
3132
+ wbuf.append_random_bytes (2 * 1024 * 1024 );
3133
+
3134
+ auto port = serv.add_port (SW_SOCK_TCP, TEST_HOST, 0 );
3135
+ ASSERT_NE (port, nullptr );
3136
+ ASSERT_EQ (serv.create (), SW_OK);
3137
+
3138
+ port->get_socket ()->set_buffer_size (65536 );
3139
+ port->buffer_high_watermark = 1024 * 1024 ;
3140
+ port->buffer_low_watermark = 65536 ;
3141
+
3142
+ thread t1;
3143
+ serv.onStart = [&lock, &t1](Server *serv) {
3144
+ t1 = thread ([=]() {
3145
+ swoole_signal_block_all ();
3146
+ lock->lock ();
3147
+ ListenPort *port = serv->get_primary_port ();
3148
+
3149
+ network::SyncClient c (SW_SOCK_TCP);
3150
+ c.connect (TEST_HOST, port->port );
3151
+ c.get_client ()->get_socket ()->set_buffer_size (65536 );
3152
+ c.send (packet, strlen (packet));
3153
+ usleep (1000 );
3154
+
3155
+ String rbuf (2 * 1024 * 1024 );
3156
+ while (rbuf.length < rbuf.size ) {
3157
+ auto rn = c.recv (rbuf.str + rbuf.length , 65536 );
3158
+ usleep (10000 );
3159
+ if (rn <= 0 ) {
3160
+ break ;
3161
+ }
3162
+ rbuf.length += rn;
3163
+ }
3164
+
3165
+ sleep (1 );
3166
+ c.close ();
3167
+ serv->shutdown ();
3168
+ });
3169
+ };
3170
+
3171
+ serv.onWorkerStart = [&lock](Server *serv, Worker *worker) {
3172
+ lock->unlock ();
3173
+ test::counter_incr (0 );
3174
+ };
3175
+
3176
+ serv.onReceive = [&wbuf](Server *serv, RecvData *req) -> int {
3177
+ EXPECT_TRUE (serv->send (req->session_id (), wbuf.str , wbuf.length ));
3178
+ return SW_OK;
3179
+ };
3180
+
3181
+ serv.onBufferEmpty = [](Server *serv, DataHead *ev) { test::counter_incr (1 ); };
3182
+
3183
+ serv.onBufferFull = [](Server *serv, DataHead *ev) { test::counter_incr (2 ); };
3184
+
3185
+ ASSERT_EQ (serv.start (), 0 );
3186
+
3187
+ t1.join ();
3188
+ delete lock;
3189
+
3190
+ ASSERT_GE (test::counter_get (1 ), 1 );
3191
+ ASSERT_GE (test::counter_get (2 ), 1 );
3192
+ }
0 commit comments