@@ -302,7 +302,9 @@ static bool
302
302
validate_event (const internal_event_t &event, runtime_id_t &rid, sequence_t &seq)
303
303
{
304
304
bool ret = false ;
305
-
305
+ if (event.empty ()) {
306
+ return ret;
307
+ }
306
308
internal_event_t ::const_iterator itc_r, itc_s, itc_e;
307
309
itc_r = event.find (EVENT_RUNTIME_ID);
308
310
itc_s = event.find (EVENT_SEQUENCE);
@@ -357,7 +359,6 @@ capture_service::do_capture()
357
359
int init_cnt;
358
360
void *cap_sub_sock = NULL ;
359
361
counters_t total_overflow = 0 ;
360
- static bool init_done = false ;
361
362
362
363
typedef enum {
363
364
/*
@@ -394,47 +395,6 @@ capture_service::do_capture()
394
395
395
396
m_cap_run = true ;
396
397
397
- if (!init_done) {
398
- zmq_msg_t msg;
399
- zmq_msg_init (&msg);
400
- int rc = zmq_msg_recv (&msg, cap_sub_sock, 0 );
401
-
402
- /*
403
- * When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1.
404
- * When capture service begins to read, the very first message that it will read is this
405
- * control character.
406
- *
407
- * We will handle by reading this message and dropping it before we begin reading for
408
- * cached events.
409
- *
410
- * This behavior will only happen once when XSUB connects to XPUB not everytime cache is started.
411
- *
412
- * There are chances that there are events already published to XSUB endpoint before XSUB is able to connect to XPUB, so we can receive events
413
- before the subscription message
414
- */
415
-
416
-
417
- if (rc == 1 ) { // Expected case to receive subscription message as very first message
418
- SWSS_LOG_INFO (" Received subscription message when XSUB connects to XPUB" );
419
- } else if (rc > 1 ) { // If there are events already published to XSUB before XSUB connects to XPUB, we can receive events before subscription message
420
- string event_source ((const char *)zmq_msg_data (&msg), zmq_msg_size (&msg));
421
- SWSS_LOG_DEBUG (" Receiving event from source: %s, will read second part of event" , event_source.c_str ());
422
- int more = 0 ;
423
- size_t more_size = sizeof (more);
424
- zmq_getsockopt (cap_sub_sock, ZMQ_RCVMORE, &more, &more_size);
425
- if (more) {
426
- zmq_msg_t msg_part;
427
- zmq_msg_init (&msg_part);
428
- zmq_msg_recv (&msg_part, cap_sub_sock, 0 );
429
- zmq_msg_close (&msg_part);
430
- }
431
- } else {
432
- SWSS_LOG_ERROR (" Error reading from ZMQ socket, rc=%d" , rc);
433
- }
434
- zmq_msg_close (&msg);
435
- init_done = true ;
436
- }
437
-
438
398
while (m_ctrl != START_CAPTURE) {
439
399
/* Wait for capture start */
440
400
this_thread::sleep_for (chrono::milliseconds (10 ));
0 commit comments