14
14
* See the License for the specific language governing permissions and
15
15
* limitations under the License.
16
16
*/
17
- use std:: collections:: HashMap ;
18
17
19
18
use cheetah_string:: CheetahString ;
19
+ use rocketmq_macros:: RequestHeaderCodec ;
20
20
use serde:: Deserialize ;
21
21
use serde:: Serialize ;
22
22
23
23
use crate :: code:: request_code:: RequestCode ;
24
- use crate :: protocol:: command_custom_header:: CommandCustomHeader ;
25
- use crate :: protocol:: command_custom_header:: FromMap ;
26
24
use crate :: protocol:: header:: message_operation_header:: send_message_request_header_v2:: SendMessageRequestHeaderV2 ;
27
25
use crate :: protocol:: header:: message_operation_header:: TopicRequestHeaderTrait ;
28
26
use crate :: protocol:: remoting_command:: RemotingCommand ;
29
27
use crate :: rpc:: topic_request_header:: TopicRequestHeader ;
30
28
31
- #[ derive( Debug , Clone , Serialize , Deserialize , Default ) ]
29
+ #[ derive( Debug , Clone , Serialize , Deserialize , Default , RequestHeaderCodec ) ]
32
30
#[ serde( rename_all = "camelCase" ) ]
33
31
pub struct SendMessageRequestHeader {
32
+ #[ required]
34
33
pub producer_group : CheetahString ,
34
+
35
+ #[ required]
35
36
pub topic : CheetahString ,
37
+
38
+ #[ required]
36
39
pub default_topic : CheetahString ,
40
+
41
+ #[ required]
37
42
pub default_topic_queue_nums : i32 ,
43
+
44
+ #[ required]
38
45
pub queue_id : Option < i32 > ,
46
+
47
+ #[ required]
39
48
pub sys_flag : i32 ,
49
+
50
+ #[ required]
40
51
pub born_timestamp : i64 ,
52
+
53
+ #[ required]
41
54
pub flag : i32 ,
55
+
42
56
pub properties : Option < CheetahString > ,
43
57
pub reconsume_times : Option < i32 > ,
44
58
pub unit_mode : Option < bool > ,
@@ -48,7 +62,7 @@ pub struct SendMessageRequestHeader {
48
62
pub topic_request_header : Option < TopicRequestHeader > ,
49
63
}
50
64
51
- impl SendMessageRequestHeader {
65
+ /* impl SendMessageRequestHeader {
52
66
pub const PRODUCER_GROUP: &'static str = "producerGroup";
53
67
pub const TOPIC: &'static str = "topic";
54
68
pub const DEFAULT_TOPIC: &'static str = "defaultTopic";
@@ -239,7 +253,7 @@ impl FromMap for SendMessageRequestHeader {
239
253
topic_request_header: Some(<TopicRequestHeader as FromMap>::from(map)?),
240
254
})
241
255
}
242
- }
256
+ }*/
243
257
244
258
impl TopicRequestHeaderTrait for SendMessageRequestHeader {
245
259
fn set_lo ( & mut self , lo : Option < bool > ) {
@@ -366,3 +380,273 @@ pub fn parse_request_header(
366
380
None => request. decode_command_custom_header :: < SendMessageRequestHeader > ( ) ,
367
381
}
368
382
}
383
+
384
+ #[ cfg( test) ]
385
+ mod tests {
386
+ use std:: collections:: HashMap ;
387
+
388
+ use cheetah_string:: CheetahString ;
389
+
390
+ use super :: * ;
391
+ use crate :: code:: request_code:: RequestCode ;
392
+ use crate :: protocol:: command_custom_header:: CommandCustomHeader ;
393
+ use crate :: protocol:: command_custom_header:: FromMap ;
394
+ use crate :: protocol:: remoting_command:: RemotingCommand ;
395
+
396
+ #[ test]
397
+ fn parse_request_header_handles_invalid_request_code ( ) {
398
+ let request = RemotingCommand :: create_remoting_command ( RequestCode :: SendBatchMessage ) ;
399
+ let request_code = RequestCode :: SendBatchMessage ;
400
+ let result = parse_request_header ( & request, request_code) ;
401
+ assert ! ( result. is_err( ) ) ;
402
+ }
403
+
404
+ #[ test]
405
+ fn parse_request_header_handles_missing_header ( ) {
406
+ let request = RemotingCommand :: create_remoting_command ( RequestCode :: SendMessageV2 ) ;
407
+ let request_code = RequestCode :: SendMessageV2 ;
408
+ let result = parse_request_header ( & request, request_code) ;
409
+ assert ! ( result. is_err( ) ) ;
410
+ }
411
+
412
+ #[ test]
413
+ fn send_message_request_header_serializes_correctly ( ) {
414
+ let header = SendMessageRequestHeader {
415
+ producer_group : CheetahString :: from_static_str ( "test_producer_group" ) ,
416
+ topic : CheetahString :: from_static_str ( "test_topic" ) ,
417
+ default_topic : CheetahString :: from_static_str ( "test_default_topic" ) ,
418
+ default_topic_queue_nums : 8 ,
419
+ queue_id : Some ( 1 ) ,
420
+ sys_flag : 0 ,
421
+ born_timestamp : 1622547800000 ,
422
+ flag : 0 ,
423
+ properties : Some ( CheetahString :: from_static_str ( "test_properties" ) ) ,
424
+ reconsume_times : Some ( 3 ) ,
425
+ unit_mode : Some ( true ) ,
426
+ batch : Some ( false ) ,
427
+ max_reconsume_times : Some ( 5 ) ,
428
+ topic_request_header : None ,
429
+ } ;
430
+ let map = header. to_map ( ) . unwrap ( ) ;
431
+ assert_eq ! (
432
+ map. get( & CheetahString :: from_static_str( "producerGroup" ) )
433
+ . unwrap( ) ,
434
+ "test_producer_group"
435
+ ) ;
436
+ assert_eq ! (
437
+ map. get( & CheetahString :: from_static_str( "topic" ) ) . unwrap( ) ,
438
+ "test_topic"
439
+ ) ;
440
+ assert_eq ! (
441
+ map. get( & CheetahString :: from_static_str( "defaultTopic" ) )
442
+ . unwrap( ) ,
443
+ "test_default_topic"
444
+ ) ;
445
+ assert_eq ! (
446
+ map. get( & CheetahString :: from_static_str( "defaultTopicQueueNums" ) )
447
+ . unwrap( ) ,
448
+ "8"
449
+ ) ;
450
+ assert_eq ! (
451
+ map. get( & CheetahString :: from_static_str( "queueId" ) ) . unwrap( ) ,
452
+ "1"
453
+ ) ;
454
+ assert_eq ! (
455
+ map. get( & CheetahString :: from_static_str( "sysFlag" ) ) . unwrap( ) ,
456
+ "0"
457
+ ) ;
458
+ assert_eq ! (
459
+ map. get( & CheetahString :: from_static_str( "bornTimestamp" ) )
460
+ . unwrap( ) ,
461
+ "1622547800000"
462
+ ) ;
463
+ assert_eq ! (
464
+ map. get( & CheetahString :: from_static_str( "flag" ) ) . unwrap( ) ,
465
+ "0"
466
+ ) ;
467
+ assert_eq ! (
468
+ map. get( & CheetahString :: from_static_str( "properties" ) )
469
+ . unwrap( ) ,
470
+ "test_properties"
471
+ ) ;
472
+ assert_eq ! (
473
+ map. get( & CheetahString :: from_static_str( "reconsumeTimes" ) )
474
+ . unwrap( ) ,
475
+ "3"
476
+ ) ;
477
+ assert_eq ! (
478
+ map. get( & CheetahString :: from_static_str( "unitMode" ) )
479
+ . unwrap( ) ,
480
+ "true"
481
+ ) ;
482
+ assert_eq ! (
483
+ map. get( & CheetahString :: from_static_str( "batch" ) ) . unwrap( ) ,
484
+ "false"
485
+ ) ;
486
+ assert_eq ! (
487
+ map. get( & CheetahString :: from_static_str( "maxReconsumeTimes" ) )
488
+ . unwrap( ) ,
489
+ "5"
490
+ ) ;
491
+ }
492
+
493
+ #[ test]
494
+ fn send_message_request_header_deserializes_correctly ( ) {
495
+ let mut map = HashMap :: new ( ) ;
496
+ map. insert (
497
+ CheetahString :: from_static_str ( "producerGroup" ) ,
498
+ CheetahString :: from_static_str ( "test_producer_group" ) ,
499
+ ) ;
500
+ map. insert (
501
+ CheetahString :: from_static_str ( "topic" ) ,
502
+ CheetahString :: from_static_str ( "test_topic" ) ,
503
+ ) ;
504
+ map. insert (
505
+ CheetahString :: from_static_str ( "defaultTopic" ) ,
506
+ CheetahString :: from_static_str ( "test_default_topic" ) ,
507
+ ) ;
508
+ map. insert (
509
+ CheetahString :: from_static_str ( "defaultTopicQueueNums" ) ,
510
+ CheetahString :: from_static_str ( "8" ) ,
511
+ ) ;
512
+ map. insert (
513
+ CheetahString :: from_static_str ( "queueId" ) ,
514
+ CheetahString :: from_static_str ( "1" ) ,
515
+ ) ;
516
+ map. insert (
517
+ CheetahString :: from_static_str ( "sysFlag" ) ,
518
+ CheetahString :: from_static_str ( "0" ) ,
519
+ ) ;
520
+ map. insert (
521
+ CheetahString :: from_static_str ( "bornTimestamp" ) ,
522
+ CheetahString :: from_static_str ( "1622547800000" ) ,
523
+ ) ;
524
+ map. insert (
525
+ CheetahString :: from_static_str ( "flag" ) ,
526
+ CheetahString :: from_static_str ( "0" ) ,
527
+ ) ;
528
+ map. insert (
529
+ CheetahString :: from_static_str ( "properties" ) ,
530
+ CheetahString :: from_static_str ( "test_properties" ) ,
531
+ ) ;
532
+ map. insert (
533
+ CheetahString :: from_static_str ( "reconsumeTimes" ) ,
534
+ CheetahString :: from_static_str ( "3" ) ,
535
+ ) ;
536
+ map. insert (
537
+ CheetahString :: from_static_str ( "unitMode" ) ,
538
+ CheetahString :: from_static_str ( "true" ) ,
539
+ ) ;
540
+ map. insert (
541
+ CheetahString :: from_static_str ( "batch" ) ,
542
+ CheetahString :: from_static_str ( "false" ) ,
543
+ ) ;
544
+ map. insert (
545
+ CheetahString :: from_static_str ( "maxReconsumeTimes" ) ,
546
+ CheetahString :: from_static_str ( "5" ) ,
547
+ ) ;
548
+
549
+ let header = <SendMessageRequestHeader as FromMap >:: from ( & map) . unwrap ( ) ;
550
+ assert_eq ! ( header. producer_group, "test_producer_group" ) ;
551
+ assert_eq ! ( header. topic, "test_topic" ) ;
552
+ assert_eq ! ( header. default_topic, "test_default_topic" ) ;
553
+ assert_eq ! ( header. default_topic_queue_nums, 8 ) ;
554
+ assert_eq ! ( header. queue_id. unwrap( ) , 1 ) ;
555
+ assert_eq ! ( header. sys_flag, 0 ) ;
556
+ assert_eq ! ( header. born_timestamp, 1622547800000 ) ;
557
+ assert_eq ! ( header. flag, 0 ) ;
558
+ assert_eq ! ( header. properties. unwrap( ) , "test_properties" ) ;
559
+ assert_eq ! ( header. reconsume_times. unwrap( ) , 3 ) ;
560
+ assert_eq ! ( header. unit_mode. unwrap( ) , true ) ;
561
+ assert_eq ! ( header. batch. unwrap( ) , false ) ;
562
+ assert_eq ! ( header. max_reconsume_times. unwrap( ) , 5 ) ;
563
+ }
564
+
565
+ #[ test]
566
+ fn send_message_request_header_handles_missing_optional_fields ( ) {
567
+ let mut map = HashMap :: new ( ) ;
568
+ map. insert (
569
+ CheetahString :: from_static_str ( "queueId" ) ,
570
+ CheetahString :: from_static_str ( "1" ) ,
571
+ ) ;
572
+ map. insert (
573
+ CheetahString :: from_static_str ( "producerGroup" ) ,
574
+ CheetahString :: from_static_str ( "test_producer_group" ) ,
575
+ ) ;
576
+ map. insert (
577
+ CheetahString :: from_static_str ( "topic" ) ,
578
+ CheetahString :: from_static_str ( "test_topic" ) ,
579
+ ) ;
580
+ map. insert (
581
+ CheetahString :: from_static_str ( "defaultTopic" ) ,
582
+ CheetahString :: from_static_str ( "test_default_topic" ) ,
583
+ ) ;
584
+ map. insert (
585
+ CheetahString :: from_static_str ( "defaultTopicQueueNums" ) ,
586
+ CheetahString :: from_static_str ( "8" ) ,
587
+ ) ;
588
+ map. insert (
589
+ CheetahString :: from_static_str ( "sysFlag" ) ,
590
+ CheetahString :: from_static_str ( "0" ) ,
591
+ ) ;
592
+ map. insert (
593
+ CheetahString :: from_static_str ( "bornTimestamp" ) ,
594
+ CheetahString :: from_static_str ( "1622547800000" ) ,
595
+ ) ;
596
+ map. insert (
597
+ CheetahString :: from_static_str ( "flag" ) ,
598
+ CheetahString :: from_static_str ( "0" ) ,
599
+ ) ;
600
+
601
+ let header = <SendMessageRequestHeader as FromMap >:: from ( & map) . unwrap ( ) ;
602
+ assert_eq ! ( header. producer_group, "test_producer_group" ) ;
603
+ assert_eq ! ( header. topic, "test_topic" ) ;
604
+ assert_eq ! ( header. default_topic, "test_default_topic" ) ;
605
+ assert_eq ! ( header. default_topic_queue_nums, 8 ) ;
606
+ assert ! ( header. queue_id. is_some( ) ) ;
607
+ assert_eq ! ( header. sys_flag, 0 ) ;
608
+ assert_eq ! ( header. born_timestamp, 1622547800000 ) ;
609
+ assert_eq ! ( header. flag, 0 ) ;
610
+ assert ! ( header. properties. is_none( ) ) ;
611
+ assert ! ( header. reconsume_times. is_none( ) ) ;
612
+ assert ! ( header. unit_mode. is_none( ) ) ;
613
+ assert ! ( header. batch. is_none( ) ) ;
614
+ assert ! ( header. max_reconsume_times. is_none( ) ) ;
615
+ }
616
+
617
+ #[ test]
618
+ fn send_message_request_header_handles_invalid_data ( ) {
619
+ let mut map = HashMap :: new ( ) ;
620
+ map. insert (
621
+ CheetahString :: from_static_str ( "producerGroup" ) ,
622
+ CheetahString :: from_static_str ( "test_producer_group" ) ,
623
+ ) ;
624
+ map. insert (
625
+ CheetahString :: from_static_str ( "topic" ) ,
626
+ CheetahString :: from_static_str ( "test_topic" ) ,
627
+ ) ;
628
+ map. insert (
629
+ CheetahString :: from_static_str ( "defaultTopic" ) ,
630
+ CheetahString :: from_static_str ( "test_default_topic" ) ,
631
+ ) ;
632
+ map. insert (
633
+ CheetahString :: from_static_str ( "defaultTopicQueueNums" ) ,
634
+ CheetahString :: from_static_str ( "invalid" ) ,
635
+ ) ;
636
+ map. insert (
637
+ CheetahString :: from_static_str ( "sysFlag" ) ,
638
+ CheetahString :: from_static_str ( "invalid" ) ,
639
+ ) ;
640
+ map. insert (
641
+ CheetahString :: from_static_str ( "bornTimestamp" ) ,
642
+ CheetahString :: from_static_str ( "invalid" ) ,
643
+ ) ;
644
+ map. insert (
645
+ CheetahString :: from_static_str ( "flag" ) ,
646
+ CheetahString :: from_static_str ( "invalid" ) ,
647
+ ) ;
648
+
649
+ let result = <SendMessageRequestHeader as FromMap >:: from ( & map) ;
650
+ assert ! ( result. is_err( ) ) ;
651
+ }
652
+ }
0 commit comments