forked from bloomberg/blazingmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbmqp_protocol.h
5203 lines (4272 loc) · 184 KB
/
bmqp_protocol.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// bmqp_protocol.h -*-C++-*-
#ifndef INCLUDED_BMQP_PROTOCOL
#define INCLUDED_BMQP_PROTOCOL
//@PURPOSE: Provide definitions for BlazingMQ protocol structures and
// constants.
//
//@CLASSES
// bmqp::RdaInfo : VST representing a counter and status flags for the
// number of redelivery attempts remaining.
// bmqp::SubQueueInfo : VST representing a sub-queue receiving a message.
// bmqp::Protocol : Namespace for protocol generic values and routines.
// bmqp::EventType : Enum for types of the packets sent.
// bmqp::EncodingType : Enum for types of encoding used for control message.
// bmqp::EncodingFeature: Field name of the encoding features and the list of
// supported encoding features.
// bmqp::OptionType : Enum for types of options for PUT or PUSH messages.
// bmqp::EventHeader : Header for a BlazingMQ event packet sent on the wire
// bmqp::EventHeaderUtil: Utility methods for 'bmqp::EventHeader'.
// bmqp::OptionHeader : Header for an option in an event packet.
// bmqp::MessagePropertiesHeader
// : Header for message properties area in PUT/PUSH msg.
// bmqp::MessagePropertyHeader
// : Header for a message property in a PUT/PUSH message.
// bmqp::PutHeader : Header for mesages in PUT event packet.
// bmqp::PutHeaderFlags : Meanings of each bit in flags field of 'PutHeader'.
// bmqp::PutHeaderFlagUtil
// : Utility methods for 'bmqp::PutHeaderFlags'.
// bmqp::AckHeader : Header for messages in ACK event packet.
// bmqp::AckHeaderFlags : Meanings of each bit in flags field of 'AckHeader'.
// bmqp::AckMessage : Structure of an ack msg (AckHeader payload).
// bmqp::PushHeader : Header for messages in PUSH event packet.
// bmqp::PushHeaderFlags: Meanings of each bit in flags field of 'PushHeader'.
// bmqp::PushHeaderFlagUtil
// : Utility methods for 'bmqp::PushHeaderFlags'.
// bmqp::ConfirmHeader : Header for messages in CONFIRM event packet.
// bmqp::ConfirmMessage : Structure of a confirm msg (ConfirmHeader payload).
// bmqp::RejectHeader : Header for messages in REJECT event packet.
// bmqp::RejectMessage : Structure of a reject msg (RejectHeader payload).
// bmqp::StorageMessageType
// : Enum for type of cluster storage messages exchanged.
// bmqp::StorageHeader : Header for a 'STORAGE' message.
// bmqp::StorageHeaderFlags
// : Meanings of bits in flags field of 'StorageHeader'.
// bmqp::StorageHeaderFlagUtil
// : Utility methods for 'bmqp::StorageHeaderFlags'.
// bmqp::RecoveryFileChunkType
// : Enum for types that a file chunk can belong to
// during recovery phase.
// bmqp::RecoveryHeader : Header for a 'RECOVERY' message.
//
//@SEE ALSO: bmqp_ctrlmsg.xsd
//
//@DESCRIPTION: This component provide definitions for a set of structures
// ('bmqp::AckHeader', 'bmqp::AckMessage', 'bmqp::ConfirmHeader',
// 'bmqp::ConfirmMessage', 'bmqp::RejectHeader', 'bmqp::RejectMessage',
// 'bmqp::EventHeader', 'bmqp::OptionHeader', 'bmqp::PushHeader' and
// 'bmqp::PutHeader') as well as an enum ('bmqp::EventType') defining the
// binary layout of the protocol messages used by BlazingMQ to communicate
// between client and broker, as well as between brokers.
//
//
/// Terminology
///===========
//
//: o A !message! is an application-level unit of data containing control
//: information and possibly application payload data.
//
//: o An !event! is an application-level unit of data containing one or
//: multiple !messages!. The purpose of !events! is to batch !messages! for
//: efficiency. There are two types of events: !protocol! !events!
//: representing for example open queue requests or acknowledgements, and
//: !data! !events! carrying published data associated with a queue.
//
//: o A !packet! (a.k.a. network message) is a network packet, e.g. a formatted
//: unit of data carried by the network. A network interface object such as
//: a Client Session sends and receives !packets! to peer processes.
//
//: o An !event! is sent over the network using one or multiple !packets!. If
//: the !event! is too big to fit in a single !packet!, it is sent using
//: multiple !fragments!, each of them carrying a segment of the whole
//: !event! data. Therefore a !packet! contains either an !event! or a
//: !fragment!.
//
//
/// Protocol Limitations
///====================
//..
// =======================================================================
// + Packet:
// Max packet size.....................: 16 MB
// -----------------------------------------------------------------------
// + Fragment Header:
// Max fragment header size.............: 28 bytes
// Unique publisher Id..................: 8 bytes
// -----------------------------------------------------------------------
// + Event Header:
// Max event size......................: 2 GB
// Max event header....................: 1 020 bytes
// Max concurrent protocol version.....: 4
// Total different event type..........: 64
// -----------------------------------------------------------------------
// + Message Header:
// Max message payload size............: ~1 GB (-messageHeader
// -options
// -messageProperties)
// Max message header size.............: 124 bytes
// Max options area length.............: 64 MB
// Max message properties area length..: 64 MB
// Max number of properties............: 255
// Max types of property data types....: 31
// Max length of a property name.......: 4 KB
// Max length of a property value......: 64 MB
// Unique message id...................: 16 bytes (GUID)
// -----------------------------------------------------------------------
// + Options:
// Max option size.....................: 8 MB
// Total different options type........: 64
// -----------------------------------------------------------------------
// + Tag List:
// Max number of tags per tag list.....: 65 536
// Total different tag id..............: 65 536
// Max tag value length................: 64 MB
// Max total TagList data length.......: ~64MB
// ========================================================================
//..
//
// BMQ
#include <bmqp_queueid.h>
#include <bmqt_compressionalgorithmtype.h>
#include <bmqt_messageguid.h>
// MWC
#include <mwcc_array.h>
// BDE
#include <bdlb_bigendian.h>
#include <bsl_cstring.h> // for bsl::memset & bsl::memcpy
#include <bsl_ostream.h>
#include <bsl_limits.h>
#include <bsl_string.h> // for bslstl::StringRef
#include <bsls_assert.h>
#include <bsls_types.h>
namespace BloombergLP {
namespace bmqp {
// ==============
// struct RdaInfo
// ==============
/// VST representing the RDA counter and flags, with the MSB being a flag
/// for unlimited retransmission and the second bit being a flag for a
/// potentially poisonous message.
struct RdaInfo {
// FRIENDS
friend bool operator==(const RdaInfo& lhs, const RdaInfo& rhs);
private:
// TYPES
enum Enum { e_UNLIMITED = (1 << 7), e_POISONOUS = (1 << 6) };
public:
// CONSTANTS
static const unsigned char k_MAX_COUNTER_VALUE;
static const unsigned char k_MAX_INTERNAL_COUNTER_VALUE;
private:
// DATA
unsigned char d_counter;
public:
// CREATORS
/// Create an `RdaInfo` with unlimited set to true.
RdaInfo();
/// Create an `RdaInfo` with the counter and flags set to
/// `internalRepresentation`. The behaviour is undefined when
/// `internalRepresentation > 255`.
RdaInfo(unsigned int internalRepresentation);
/// Create an `RdaInfo` that has the same value as the specified
/// `original` object.
RdaInfo(const RdaInfo& original);
// MANIPULATORS
/// Set the unlimited delivery flag to true.
RdaInfo& setUnlimited();
/// Set the poisonous message flag to true.
RdaInfo& setPotentiallyPoisonous(bool flag);
/// Set the RDA counter to the specified `counter` value. The behaviour
/// is undefined unless `0 <= counter <= 63`.
RdaInfo& setCounter(unsigned int counter);
// ACCESSORS
/// Return whether the RDA counter is set to unlimited.
bool isUnlimited() const;
/// Return whether or not this message has been marked as potentially
/// poisonous.
bool isPotentiallyPoisonous() const;
/// Return the value of the counter, or in the case the RDA counter is
/// set to unlimited, a value higher than k_MAX_COUNTER_VALUE.
unsigned int counter() const;
/// Return the internal representation of the counter with its flags.
unsigned int internalRepresentation() const;
/// Format this object to the specified output `stream` at the (absolute
/// value of) the optionally specified indentation `level` and return a
/// reference to `stream`. If `level` is specified, optionally specify
/// `spacesPerLevel`, the number of spaces per indentation level for
/// this and all of its nested objects. If `level` is negative,
/// suppress indentation of the first line. If `spacesPerLevel` is
/// negative format the entire output on one line, suppressing all but
/// the initial indentation (as governed by `level`). If `stream` is
/// not valid on entry, this operation has no effect.
bsl::ostream&
print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
};
// FREE OPERATORS
/// Format the specified `rhs` to the specified output `stream` and return a
/// reference to the modifiable `stream`.
bsl::ostream& operator<<(bsl::ostream& stream, const RdaInfo& rhs);
/// Return `true` if `rhs` object contains the value of the same type as
/// contained in `lhs` object and the value itself is the same in both
/// objects, return false otherwise.
bool operator==(const RdaInfo& lhs, const RdaInfo& rhs);
// ===================
// struct SubQueueInfo
// ===================
/// VST representing a sub-queue which will be receiving a message.
struct SubQueueInfo {
// FRIENDS
friend bool operator==(const SubQueueInfo& lhs, const SubQueueInfo& rhs);
private:
// DATA
bdlb::BigEndianUint32 d_subQueueId;
// Id of the sub-queue
RdaInfo d_rdaInfo;
// RDA info for the sub-queue
unsigned char d_reserved[3];
// Reserved
public:
// CREATORS
/// Create a `SubQueueInfo` with the default sub-queue id and an
/// unlimited RDA counter.
SubQueueInfo();
/// Create a `SubQueueInfo` with the specified `id`.
SubQueueInfo(unsigned int id);
/// Create a `SubQueueInfo` with the specified `id` and
/// `rdaInfo`.
SubQueueInfo(unsigned int id, const RdaInfo& rdaInfo);
// MANIPULATORS
/// Set the sub-queue id to the specified `value` and return a reference
/// offering modifiable access to this object.
SubQueueInfo& setId(unsigned int value);
// ACCESSORS
/// Return the sub-queue id.
unsigned int id() const;
RdaInfo& rdaInfo();
// Return a reference to the modifiable "RdaInfo" attribute of this object.
const RdaInfo& rdaInfo() const;
// Return a reference to the non-modifiable "RdaInfo" attribute of this
// object.
/// Format this object to the specified output `stream` at the (absolute
/// value of) the optionally specified indentation `level` and return a
/// reference to `stream`. If `level` is specified, optionally specify
/// `spacesPerLevel`, the number of spaces per indentation level for
/// this and all of its nested objects. If `level` is negative,
/// suppress indentation of the first line. If `spacesPerLevel` is
/// negative format the entire output on one line, suppressing all but
/// the initial indentation (as governed by `level`). If `stream` is
/// not valid on entry, this operation has no effect.
bsl::ostream&
print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
};
// FREE OPERATORS
/// Format the specified `rhs` to the specified output `stream` and return a
/// reference to the modifiable `stream`.
bsl::ostream& operator<<(bsl::ostream& stream, const SubQueueInfo& rhs);
/// Return `true` if `rhs` object contains the value of the same type as
/// contained in `lhs` object and the value itself is the same in both
/// objects, return false otherwise.
bool operator==(const SubQueueInfo& lhs, const SubQueueInfo& rhs);
// ===============
// struct Protocol
// ===============
/// Namespace for protocol generic values and routines
struct Protocol {
// TYPES
/// A constant used to declare the length of static part of the array of
/// subQueueIds (or AppKeys).
static const size_t k_SUBID_ARRAY_STATIC_LEN = 16;
/// An array of subQueueInfos with statically reserved space for a
/// number of subQueueInfos (as indicated by the second template
/// parameter).
typedef mwcc::Array<SubQueueInfo, k_SUBID_ARRAY_STATIC_LEN>
SubQueueInfosArray;
/// An array of subQueueIds with statically reserved space for a number
/// of subQueueIds (as indicated by the second template parameter). It
/// is deprecated by the new SubQueueInfosArray above inside the
/// brokers, but the SDK still receives and process this older flavor
/// due to backward compatibility issues.
typedef mwcc::Array<unsigned int, k_SUBID_ARRAY_STATIC_LEN>
SubQueueIdsArrayOld;
/// Holds the client-provided Group Id.
typedef bsl::string MsgGroupId;
// CONSTANTS
static const int k_VERSION = 1;
// Version of the protocol
static const int k_DEV_VERSION = 999999;
// Version assigned to all dev builds.
static const int k_WORD_SIZE = 4;
// Number of bytes per word
static const int k_DWORD_SIZE = 8;
// Number of bytes per dword
static const int k_PACKET_MIN_SIZE = 4;
// Minimum size (bytes) of a packet, needed to know what
// its real size is. Must NEVER be changed; used when
// receiving a packet on the socket.
static const int k_COMPRESSION_MIN_APPDATA_SIZE = 1024;
// Threshold below which PUT's message application data
// will not be compressed regardless of the compression
// algorithm type set to the PutEventBuilder.
static const int k_CONSUMER_PRIORITY_INVALID;
// Constant representing the invalid consumer priority
// (e.g. of a non-consumer client).
static const int k_CONSUMER_PRIORITY_MIN;
// Constant representing the minimum valid consumer
// priority. Must be > 'k_CONSUMER_PRIORITY_INVALID'.
static const int k_CONSUMER_PRIORITY_MAX;
// Constant representing the maximum valid consumer
// priority. Must be >= 'k_CONSUMER_PRIORITY_MIN'.
static const int k_MSG_GROUP_ID_MAX_LENGTH = 31;
// Constant representing the maximum valid Group Id size.
static const int k_MAX_OPTIONS_SIZE;
// Constant representing the maximum size of options area
// for any type of event (PUT, PUSH, etc). Note that
// each header (PutHeader, PushHeader, etc) may define
// their own such constant, but we ensure that all such
// constants have the same value.
static const unsigned int k_RDA_VALUE_MAX;
// Constant representing the maximum valid
// RemainingDeliveryAttempts counter value.
static const unsigned int k_DEFAULT_SUBSCRIPTION_ID = 0;
// Internal unique id in Configure request
// CLASS METHODS
/// Combine the specified `upper` and `lower` 32 bits to return an
/// unsigned 64-bit value.
static bsls::Types::Uint64 combine(unsigned int upper, unsigned int lower);
/// Return the upper 32 bits of the specified unsigned 64-bit `value`.
static unsigned int getUpper(bsls::Types::Uint64 value);
/// Return the lower 32 bits of the specified unsigned 64-bit `value`.
static unsigned int getLower(bsls::Types::Uint64 value);
/// Return 48-32 bits of the specified unsigned 64-bit `value`.
static unsigned short get48to32Bits(bsls::Types::Uint64 value);
/// Populate the specified `upper` and `lower` buffers with the upper
/// and lower 32 bits respectively of the specified unsigned 64-bit
/// `value`.
static void
split(unsigned int* upper, unsigned int* lower, bsls::Types::Uint64 value);
/// Populate the specified `upper` and `lower` buffers with the upper
/// and lower 32 bits respectively of the specified unsigned 64-bit
/// `value`.
static void split(bdlb::BigEndianUint32* upper,
bdlb::BigEndianUint32* lower,
bsls::Types::Uint64 value);
/// Populate the specified `upper` and `lower` buffers with the upper 16
/// and lower 32 bits respectively of the specified `value`. Behavior
/// is undefined unless all bits higher than 48 in `value` are zero.
static void split(bdlb::BigEndianUint16* upper,
bdlb::BigEndianUint32* lower,
bsls::Types::Uint64 value);
};
// ================
// struct EventType
// ================
/// This struct defines the type of events exchanged between BlazingMQ SDK
/// client and BlazingMQ broker, as well as between BlazingMQ brokers.
struct EventType {
// TYPES
enum Enum {
e_UNDEFINED = 0,
e_CONTROL = 1 // Protocol event (schema-based choice)
,
e_PUT = 2,
e_CONFIRM = 3,
e_PUSH = 4,
e_ACK = 5,
e_CLUSTER_STATE = 6,
e_ELECTOR = 7,
e_STORAGE = 8,
e_RECOVERY = 9,
e_PARTITION_SYNC = 10,
e_HEARTBEAT_REQ = 11,
e_HEARTBEAT_RSP = 12,
e_REJECT = 13,
e_REPLICATION_RECEIPT = 14
};
// CONSTANTS
/// NOTE: This value must always be equal to the lowest type in the
/// enum because it is being used as a lower bound to verify that an
/// Event's `type` field is a supported type.
static const int k_LOWEST_SUPPORTED_EVENT_TYPE = e_CONTROL;
/// NOTE: This value must always be equal to the highest type in the
/// enum because it is being used as an upper bound to verify an
/// Event's `type` field is a supported type.
static const int k_HIGHEST_SUPPORTED_EVENT_TYPE = e_REPLICATION_RECEIPT;
// CLASS METHODS
/// Write the string representation of the specified enumeration `value`
/// to the specified output `stream`, and return a reference to
/// `stream`. Optionally specify an initial indentation `level`, whose
/// absolute value is incremented recursively for nested objects. If
/// `level` is specified, optionally specify `spacesPerLevel`, whose
/// absolute value indicates the number of spaces per indentation level
/// for this and all of its nested objects. If `level` is negative,
/// suppress indentation of the first line. If `spacesPerLevel` is
/// negative, format the entire output on one line, suppressing all but
/// the initial indentation (as governed by `level`). See `toAscii` for
/// what constitutes the string representation of a `EventType::Enum`
/// value.
static bsl::ostream& print(bsl::ostream& stream,
EventType::Enum value,
int level = 0,
int spacesPerLevel = 4);
/// Return the non-modifiable string representation corresponding to the
/// specified enumeration `value`, if it exists, and a unique (error)
/// string otherwise. The string representation of `value` matches its
/// corresponding enumerator name with the "e_" prefix eluded. For
/// example:
/// ```
/// bsl::cout << EventType::toAscii(EventType::e_CONFIRM);
/// ```
/// will print the following on standard output:
/// ```
/// CONFIRM
/// ```
/// Note that specifying a `value` that does not match any of the
/// enumerators will result in a string representation that is distinct
/// from any of those corresponding to the enumerators, but is otherwise
/// unspecified.
static const char* toAscii(EventType::Enum value);
};
// FREE OPERATORS
/// Format the specified `value` to the specified output `stream` and return
/// a reference to the modifiable `stream`.
bsl::ostream& operator<<(bsl::ostream& stream, EventType::Enum value);
// ===================
// struct EncodingType
// ===================
/// This struct defines the types of encoding that can be used by a client
/// or a broker for control messages
struct EncodingType {
// TYPES
enum Enum {
e_UNKNOWN = -1,
e_BER = 0 // For backward compatibility, 'BER' needs to be
// assigned a value of zero.
,
e_JSON = 1
};
// CONSTANTS
/// NOTE: This value must always be equal to the lowest type in the
/// enum because it is being used as a lower bound to verify that an
/// Event's `encoding type` field is a supported type.
static const int k_LOWEST_SUPPORTED_ENCODING_TYPE = e_BER;
/// NOTE: This value must always be equal to the highest type in the
/// enum because it is being used as an upper bound to verify that an
/// Event's `encoding type` field is a supported type.
static const int k_HIGHEST_SUPPORTED_ENCODING_TYPE = e_JSON;
// CLASS METHODS
/// Write the string representation of the specified enumeration `value`
/// to the specified output `stream`, and return a reference to
/// `stream`. Optionally specify an initial indentation `level`, whose
/// absolute value is incremented recursively for nested objects. If
/// `level` is specified, optionally specify `spacesPerLevel`, whose
/// absolute value indicates the number of spaces per indentation level
/// for this and all of its nested objects. If `level` is negative,
/// suppress indentation of the first line. If `spacesPerLevel` is
/// negative, format the entire output on one line, suppressing all but
/// the initial indentation (as governed by `level`). See `toAscii` for
/// what constitutes the string representation of a `EncodingType::Enum`
/// value.
static bsl::ostream& print(bsl::ostream& stream,
EncodingType::Enum value,
int level = 0,
int spacesPerLevel = 4);
/// Return the non-modifiable string representation corresponding to the
/// specified enumeration `value`, if it exists, and a unique (error)
/// string otherwise. The string representation of `value` matches its
/// corresponding enumerator name with the "e_" prefix eluded. For
/// example:
/// ```
/// bsl::cout << EncodingType::toAscii(EncodingType::e_BER);
/// ```
/// will print the following on standard output:
/// ```
/// BER
/// ```
/// Note that specifying a `value` that does not match any of the
/// enumerators will result in a string representation that is distinct
/// from any of those corresponding to the enumerators, but is otherwise
/// unspecified.
static const char* toAscii(EncodingType::Enum value);
};
// FREE OPERATORS
/// Format the specified `value` to the specified output `stream` and return
/// a reference to the modifiable `stream`.
bsl::ostream& operator<<(bsl::ostream& stream, EncodingType::Enum value);
// ======================
// struct EncodingFeature
// ======================
/// This struct defines the field name of the encoding features and the
/// list of supported encoding features exchanged in a Negotiation Message
/// between a broker and its peer (a broker or a client).
struct EncodingFeature {
// CONSTANTS
/// Field name of the encoding features
static const char k_FIELD_NAME[];
/// BER encoding feature
static const char k_ENCODING_BER[];
/// JSON encoding feature
static const char k_ENCODING_JSON[];
};
/// This struct defines feature names related to High-Availability
struct HighAvailabilityFeatures {
/// Field name of the encoding features
static const char k_FIELD_NAME[];
// CONSTANTS
static const char k_BROADCAST_TO_PROXIES[];
static const char k_GRACEFUL_SHUTDOWN[];
};
/// This struct defines feature names related to MessageProperties
struct MessagePropertiesFeatures {
/// Field name of the encoding features
static const char k_FIELD_NAME[];
// CONSTANTS
static const char k_MESSAGE_PROPERTIES_EX[];
};
// =================
// struct OptionType
// =================
/// This struct defines the type of options that can appear in `PUT` or
/// `PUSH` messages.
struct OptionType {
// TYPES
enum Enum {
// Values for the OptionHeader 'type' field. Valid values are in the
// range [0, OptionHeader::k_MAX_TYPE]. They must begin at 0 and
// increment by one.
e_UNDEFINED = 0,
e_SUB_QUEUE_IDS_OLD = 1,
e_MSG_GROUP_ID = 2,
e_SUB_QUEUE_INFOS = 3
};
// CONSTANTS
/// NOTE: This value must always be equal to the lowest type in the
/// enum because it is being used as a lower bound to verify that an
/// OptionHeader's `type` field is a supported type.
static const int k_LOWEST_SUPPORTED_TYPE = e_SUB_QUEUE_IDS_OLD;
/// NOTE: This value must always be equal to the highest type in the
/// enum because it is being used as an upper bound to verify an
/// OptionHeader's `type` field is a supported type.
static const int k_HIGHEST_SUPPORTED_TYPE = e_SUB_QUEUE_INFOS;
// CLASS METHODS
/// Write the string representation of the specified enumeration `value`
/// to the specified output `stream`, and return a reference to
/// `stream`. Optionally specify an initial indentation `level`, whose
/// absolute value is incremented recursively for nested objects. If
/// `level` is specified, optionally specify `spacesPerLevel`, whose
/// absolute value indicates the number of spaces per indentation level
/// for this and all of its nested objects. If `level` is negative,
/// suppress indentation of the first line. If `spacesPerLevel` is
/// negative, format the entire output on one line, suppressing all but
/// the initial indentation (as governed by `level`). See `toAscii` for
/// what constitutes the string representation of a `OptionType::Enum`
/// value.
static bsl::ostream& print(bsl::ostream& stream,
OptionType::Enum value,
int level = 0,
int spacesPerLevel = 4);
/// Return the non-modifiable string representation corresponding to the
/// specified enumeration `value`, if it exists, and a unique (error)
/// string otherwise. The string representation of `value` matches its
/// corresponding enumerator name with the "e_" prefix eluded. For
/// example:
/// ```
/// bsl::cout << OptionType::toAscii(OptionType::e_UNDEFINED);
/// ```
/// will print the following on standard output:
/// ```
/// UNDEFINED
/// ```
/// Note that specifying a `value` that does not match any of the
/// enumerators will result in a string representation that is distinct
/// from any of those corresponding to the enumerators, but is otherwise
/// unspecified.
static const char* toAscii(OptionType::Enum value);
};
// FREE OPERATORS
/// Format the specified `value` to the specified output `stream` and return
/// a reference to the modifiable `stream`.
bsl::ostream& operator<<(bsl::ostream& stream, OptionType::Enum value);
// ==================
// struct EventHeader
// ==================
/// This struct represents the header for all the events received by the
/// broker from local clients or from peer brokers. A well-behaved event
/// header will always have its fragment bit set to zero.
struct EventHeader {
// EventHeader structure datagram [8 bytes]:
//..
// +---------------+---------------+---------------+---------------+
// |0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|
// +---------------+---------------+---------------+---------------+
// |F| Length |
// +---------------+---------------+---------------+---------------+
// |PV | Type | HeaderWords | TypeSpecific | Reserved |
// +---------------+---------------+---------------+---------------+
// F..: Fragment
// PV.: Protocol Version
//
// Fragment (F)..........: Always set to 0
// Length................: Total size (bytes) of this event
// Protocol Version (PV).: Protocol Version (up to 4 concurrent versions)
// Type..................: Type of the event (from EventType::Enum)
// HeaderWords...........: Number of words of this event header
// TypeSpecific..........: Content specific to the event's type, see below
// Reserved..............: For alignment and extension ~ must be 0
//..
//
// TypeSpecific content:
//: o ControlMessage: represent the encoding used for that control message
// |0|1|2|3|4|5|6|7|
// +---------------+
// |CODEC| Reserved|
//
// NOTE: The HeaderWords allows to eventually put event level options
// (either by extending the EventHeader struct, or putting new struct
// after the EventHeader). For now, this is left up for future
// enhancement as there are no use-case for an event level option.
private:
// PRIVATE CONSTANTS
static const int k_FRAGMENT_NUM_BITS = 1;
static const int k_LENGTH_NUM_BITS = 31;
static const int k_PROTOCOL_VERSION_NUM_BITS = 2;
static const int k_TYPE_NUM_BITS = 6;
static const int k_FRAGMENT_START_IDX = 31;
static const int k_LENGTH_START_IDX = 0;
static const int k_PROTOCOL_VERSION_START_IDX = 6;
static const int k_TYPE_START_IDX = 0;
static const int k_FRAGMENT_MASK;
static const int k_LENGTH_MASK;
static const int k_PROTOCOL_VERSION_MASK;
static const int k_TYPE_MASK;
private:
// DATA
bdlb::BigEndianUint32 d_fragmentBitAndLength;
// Total size (in bytes) of the event (header
// included), and fragment bit (most significant,
// always set to zero)
unsigned char d_protocolVersionAndType;
// Event type (Control, Push, Pull, etc) and
// protocol version.
unsigned char d_headerWords;
// Number of words in this header struct.
unsigned char d_typeSpecific;
// Options and flags specific to this event's type
unsigned char d_reserved;
// Reserved.
public:
// PUBLIC CLASS DATA
/// Maximum size (bytes) of a full event, per protocol limitations.
static const int k_MAX_SIZE_HARD = (1U << k_LENGTH_NUM_BITS) - 1U;
/// Maximum size (bytes) of a full event, enforced.
///
/// TBD: The constant `EventHeader::k_MAX_SIZE_SOFT` is used by various
/// by bmqp builders (including bmqp::StorageEventBuilder::packMessage)
/// to ensure that the size of the outgoing bmqp event is not bigger
/// than this constant. A PUT message is permitted to have a maximum
/// size of 64MB (see `PutHeader::k_MAX_PAYLOAD_SIZE_SOFT`), and the
/// replication layer, which uses `bmqp::StorageEventBuilder`, must be
/// able to build and send that message. Replication layer adds its own
/// header and journal record to the PUT message, such that the size of
/// an outgoing storage message can exceed
/// `PutHeader::k_MAX_PAYLOAD_SIZE_SOFT`. So, we assign a value of 65MB
/// to `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT`, and assign a value of
/// 66MB to `'EventHeader::k_MAX_SIZE_SOFT` such that a PUT message
/// having the maximum allowable value is processed through the entire
/// BlazingMQ pipeline w/o any issues. Also see notes in
/// `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT` constant.
static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024;
/// Highest possible value for the type of an event.
static const int k_MAX_TYPE = (1 << k_TYPE_NUM_BITS) - 1;
/// Maximum size (bytes) of an `EventHeader`.
static const int k_MAX_HEADER_SIZE = ((1 << 8) - 1) *
Protocol::k_WORD_SIZE;
/// Minimum size (bytes) of an `EventHeader` (that is sufficient to
/// capture header words). This value should *never* change.
static const int k_MIN_HEADER_SIZE = 6;
public:
// CREATORS
/// Create this object with its type set to `EventType::e_UNDEFINED`,
/// `headerNumWords` and `length` fields set to appropriate value
/// derived from sizeof() operator and the protocol version set to the
/// current one. All other fields are set to zero.
explicit EventHeader();
/// Create this object with its type set to the specified `type`, and
/// `headerNumWords` and `length` fields set to appropriate value
/// derived from sizeof() operator and the protocol version set to the
/// current one. All other fields are set to zero.
explicit EventHeader(EventType::Enum type);
// MANIPULATORS
/// Set the length to the specified `value` (in bytes) and return a
/// reference offering modifiable access to this object.
EventHeader& setLength(int value);
/// Set the protocol version to the specified `value` and return a
/// reference offering modifiable access to this object.
EventHeader& setProtocolVersion(unsigned char value);
/// Set the type to the specified `value` and return a reference
/// offering modifiable access to this object.
EventHeader& setType(EventType::Enum value);
/// Set the number of words composing this header to the specified
/// `value` and return a reference offering modifiable access to this
/// object.
EventHeader& setHeaderWords(unsigned char value);
/// Set the options and flags specific to this event's type to the
/// specified `value` and return a reference offering modifiable access
/// to this object.
EventHeader& setTypeSpecific(unsigned char value);
// ACCESSORS
/// Return the value of the fragment bit of this event.
int fragmentBit() const;
/// Return the length of this event (in bytes).
int length() const;
/// Return the protocol version set in this event.
unsigned char protocolVersion() const;
/// Return the type of this event.
EventType::Enum type() const;
/// Return the number of words composing this header.
unsigned char headerWords() const;
/// Return the options and flags specific to this event's type.
unsigned char typeSpecific() const;
};
// ======================
// struct EventHeaderUtil
// ======================
/// This component provides utility methods for `bmqp::EventHeader`.
struct EventHeaderUtil {
private:
static const int k_CONTROL_EVENT_ENCODING_NUM_BITS = 3;
static const int k_CONTROL_EVENT_ENCODING_START_IDX = 5;
static const int k_CONTROL_EVENT_ENCODING_MASK;
public:
// CLASS METHODS
/// Set the appropriate bits in the specified `eventHeader` to represent
/// the specified encoding `type` for a control event.
static void setControlEventEncodingType(EventHeader* eventHeader,
EncodingType::Enum type);
/// Return the encoding type for a control event represented by the
/// appropriate bits in the specified `eventHeader`.
static EncodingType::Enum
controlEventEncodingType(const EventHeader& eventHeader);
};
// ===================
// struct OptionHeader
// ===================
/// This struct represents the header for an option. In a typical
/// implementation usage, every Option struct will start by an
/// `OptionHeader` member.
struct OptionHeader {
// OptionHeader structure datagram [4 bytes]:
//..
// +---------------+---------------+---------------+---------------+
// |0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|
// +---------------+---------------+---------------+---------------+
// | Type |P| TS | Words |
// +---------------+---------------+---------------+---------------+
//
// Type...............: Type of this option
// Packed (P).........: Flag to indicate *packed* options. If set,
// 'words' field will be reinterpreted as extra
// type-specific content of this option in addition
// to the 'TS' field, and there will be no option
// content following this header
// Type-Specific (TS).: Content specific to this option's type, see below
// Words..............: Length (words) of this option, including this
// header. If *packed*, this field will be
// reinterpreted as additional type-specific content
//..
//
// TypeSpecific content:
// o e_SUB_QUEUE_INFOS: If *packed*, 'words' field will be reinterpreted as
// the RDA counter for the default SubQueueId. Note
// that the options is only *packed* for non-fanout
// mode. Else, 'TS' field will represent the size of
// each item in the SubQueueInfosArray encoded in the
// option content.
//
// NOTE:
// o In order to preserve alignment, this struct can *NOT* be changed
// o Every 'Option' must start by this header, and be 4 bytes aligned with
// optional padding at the end.
// o For efficiency, since option lookup will be a linear search, options
// should be added by decreasing order of usage (or could be sorted by
// option id and use optimize the linear search to cut off earlier
// eventually).
// o Options that are followed by one or multiple 'variable length'
// fields may include a byte representing the size (words) of the
// optionHeader.
//
/// Example of a 'real' option struct:
/// ---------------------------------
//..
// struct MyStruct {
// OptionHeader d_optionHeader;
// bdlb::BigEndianUint32 d_timesamp;
// bdlb::BigEndianUint32 d_msgId;
// };
//..
private:
// PRIVATE CONSTANTS
static const int k_TYPE_NUM_BITS = 6;
static const int k_PACKED_NUM_BITS = 1;
static const int k_TYPE_SPECIFIC_NUM_BITS = 4;
static const int k_WORDS_NUM_BITS = 21;
static const int k_TYPE_START_IDX = 26;
static const int k_PACKED_START_IDX = 25;
static const int k_TYPE_SPECIFIC_START_IDX = 21;
static const int k_WORDS_START_IDX = 0;
static const int k_TYPE_MASK;
static const int k_PACKED_MASK;
static const int k_TYPE_SPECIFIC_MASK;
static const int k_WORDS_MASK;
private:
// DATA
bdlb::BigEndianUint32 d_content;
// Content of this OptionHeader, including option
// type, packed flag, type-specific content and
// number of words in this option (including this
// OptionHeader).