Skip to content

Commit c4ff949

Browse files
RichLogansuhasHereTimEvens
authored
Draft 11 Control Messages (#553)
* Run parser for draft11 * Rerun with location * remove duplicate auto-gen files * add back the location stream ops * Change control message payload size to 16bit Resolves #552 * Regen messages * ReasonPhrase<->ErrorReason * Use request ID * Use Location structure * Add draft 11 subgroup header + logic (#554) * Add draft 11 subgroup header + logic * Make test case parameterized * Naming * Rename * Add more location changes * More location updates * Standardize error reason/reason phrase * Update type * Implement new KVP MoQ structure (#555) * Implement new KVP MoQ structure - Add for parameters - Consolidate uint64_t enum coding - Move parameters to new structure * Remove parameter parsing from parser * Update for odd values * Fix decode and tests * Decode to encoded size * more request id updates * Use sizeof * Fix tests * more updates to support request id changes * format * Update remaining changes for draft-11 * Update tests for draft-11 changes * Fix example resolve announce * Fix merge & Request ID * Fix test cases for new output * Add stream operator for uint16_t * Add ControlMessage encoder --------- Co-authored-by: suhasHere <[email protected]> Co-authored-by: Tim Evens <[email protected]> Co-authored-by: Tim Evens <[email protected]>
1 parent ef00f72 commit c4ff949

32 files changed

+11303
-1286
lines changed

cmd/examples/server.cpp

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ namespace qserver_vars {
7272
* @example
7373
* track_alias = subscribe_alias_sub_id[conn_id][subscribe_id]
7474
*/
75-
std::map<quicr::ConnectionHandle, std::map<quicr::messages::SubscribeID, quicr::messages::TrackAlias>>
75+
std::map<quicr::ConnectionHandle, std::map<quicr::messages::RequestID, quicr::messages::TrackAlias>>
7676
subscribe_alias_sub_id;
7777

7878
/**
@@ -128,7 +128,7 @@ namespace qserver_vars {
128128
/**
129129
* @brief Map of atomic bools to mark if a fetch thread should be interrupted.
130130
*/
131-
std::map<std::pair<quicr::ConnectionHandle, quicr::messages::SubscribeID>, std::atomic_bool> stop_fetch;
131+
std::map<std::pair<quicr::ConnectionHandle, quicr::messages::RequestID>, std::atomic_bool> stop_fetch;
132132
}
133133

134134
/**
@@ -428,7 +428,7 @@ class MyServer : public quicr::Server
428428

429429
void AnnounceReceived(quicr::ConnectionHandle connection_handle,
430430
const quicr::TrackNamespace& track_namespace,
431-
const quicr::PublishAnnounceAttributes&) override
431+
const quicr::PublishAnnounceAttributes& attrs) override
432432
{
433433
auto th = quicr::TrackHash({ track_namespace, {}, std::nullopt });
434434

@@ -469,7 +469,7 @@ class MyServer : public quicr::Server
469469
}
470470
}
471471

472-
ResolveAnnounce(connection_handle, track_namespace, sub_annos_connections, announce_response);
472+
ResolveAnnounce(connection_handle, attrs.request_id, track_namespace, sub_annos_connections, announce_response);
473473

474474
// Check if there are any subscribes. If so, send subscribe to announce for all tracks matching namespace
475475
for (const auto& [ns, sub_tracks] : qserver_vars::subscribe_active) {
@@ -528,7 +528,7 @@ class MyServer : public quicr::Server
528528
}
529529

530530
// Remove active subscribes
531-
std::vector<quicr::messages::SubscribeID> subscribe_ids;
531+
std::vector<quicr::messages::RequestID> subscribe_ids;
532532
auto ta_conn_it = qserver_vars::subscribe_alias_sub_id.find(connection_handle);
533533
if (ta_conn_it != qserver_vars::subscribe_alias_sub_id.end()) {
534534
for (const auto& [sub_id, _] : ta_conn_it->second) {
@@ -662,16 +662,14 @@ class MyServer : public quicr::Server
662662
return;
663663
}
664664

665-
std::optional<uint64_t> latest_group_id = std::nullopt;
666-
std::optional<uint64_t> latest_object_id = std::nullopt;
665+
std::optional<quicr::messages::Location> largest_location = std::nullopt;
667666

668667
auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
669668
if (cache_entry_it != qserver_vars::cache.end()) {
670669
auto& [_, cache] = *cache_entry_it;
671670
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
672671
const auto& latest_object = std::prev(latest_group->end());
673-
latest_group_id = latest_object->headers.group_id;
674-
latest_object_id = latest_object->headers.object_id;
672+
largest_location = { latest_object->headers.group_id, latest_object->headers.object_id };
675673
}
676674
}
677675

@@ -681,8 +679,7 @@ class MyServer : public quicr::Server
681679
quicr::SubscribeResponse::ReasonCode::kOk,
682680
std::nullopt,
683681
std::nullopt,
684-
latest_group_id,
685-
latest_object_id,
682+
largest_location,
686683
});
687684

688685
auto pub_track_h =
@@ -760,25 +757,22 @@ class MyServer : public quicr::Server
760757
}
761758
}
762759

763-
LargestAvailable GetLargestAvailable(const quicr::FullTrackName& track_name) override
760+
std::optional<quicr::messages::Location> GetLargestAvailable(const quicr::FullTrackName& track_name) override
764761
{
765762
// Get the largest object from the cache.
766-
std::optional<uint64_t> largest_group_id = std::nullopt;
767-
std::optional<uint64_t> largest_object_id = std::nullopt;
763+
std::optional<quicr::messages::Location> largest_location = std::nullopt;
764+
768765
const auto& th = quicr::TrackHash(track_name);
769766
const auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
770767
if (cache_entry_it != qserver_vars::cache.end()) {
771768
auto& [_, cache] = *cache_entry_it;
772769
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
773770
const auto& latest_object = std::prev(latest_group->end());
774-
largest_group_id = latest_object->headers.group_id;
775-
largest_object_id = latest_object->headers.object_id;
771+
largest_location = { latest_object->headers.group_id, latest_object->headers.object_id };
776772
}
777773
}
778-
if (!largest_group_id.has_value() || !largest_object_id.has_value()) {
779-
return std::nullopt;
780-
}
781-
return std::make_pair(*largest_group_id, *largest_object_id);
774+
775+
return largest_location;
782776
}
783777

784778
bool OnFetchOk(quicr::ConnectionHandle connection_handle,

include/quicr/client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,11 @@ namespace quicr {
137137
* will send the protocol message based on the SubscribeResponse
138138
*
139139
* @param connection_handle source connection ID
140-
* @param subscribe_id subscribe ID
140+
* @param request_id request ID
141141
* @param subscribe_response response to for the subscribe
142142
*/
143143
virtual void ResolveSubscribe(ConnectionHandle connection_handle,
144-
uint64_t subscribe_id,
144+
uint64_t request_id,
145145
const SubscribeResponse& subscribe_response);
146146

147147
/**

include/quicr/common.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ namespace quicr {
2626
* @details Various attributes relative to the publish announce
2727
*/
2828
struct PublishAnnounceAttributes
29-
{};
29+
{
30+
uint64_t request_id{ 0 };
31+
};
3032

3133
/**
3234
* @brief Client Setup Attributes

include/quicr/detail/base_track_handler.h

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#pragma once
55

66
#include "quicr/common.h"
7+
#include "quicr/detail/ctrl_message_types.h"
78
#include "quicr/track_name.h"
89

910
#include <optional>
@@ -42,11 +43,10 @@ namespace quicr {
4243
};
4344
ReasonCode reason_code;
4445

45-
std::optional<std::string> reason_phrase = std::nullopt;
46+
std::optional<std::string> error_reason = std::nullopt;
4647
std::optional<uint64_t> track_alias = std::nullopt; ///< Set only when ResponseCode is kRetryTrackAlias
4748

48-
std::optional<uint64_t> largest_group_id = std::nullopt;
49-
std::optional<uint64_t> largest_object_id = std::nullopt;
49+
std::optional<messages::Location> largest_location = std::nullopt;
5050
};
5151

5252
/**
@@ -101,20 +101,20 @@ namespace quicr {
101101
std::optional<uint64_t> GetTrackAlias() const noexcept { return full_track_name_.track_alias; }
102102

103103
/**
104-
* @brief Sets the subscribe ID
105-
* @details MoQ instance sets the subscribe id based on subscribe track method call. Subscribe
104+
* @brief Sets the reqeust ID
105+
* @details MoQ instance sets the request id based on subscribe track method call. Request
106106
* id is specific to the connection, so it must be set by the moq instance/connection.
107107
*
108-
* @param subscribe_id 62bit subscribe ID
108+
* @param request_id 62bit request ID
109109
*/
110-
void SetSubscribeId(std::optional<uint64_t> subscribe_id) { subscribe_id_ = subscribe_id; }
110+
void SetRequestId(std::optional<uint64_t> request_id) { request_id_ = request_id; }
111111

112112
/**
113-
* @brief Get the subscribe ID
113+
* @brief Get the request ID
114114
*
115-
* @return nullopt if not subscribed, otherwise the subscribe ID
115+
* @return nullopt if not subscribed, otherwise the request ID
116116
*/
117-
std::optional<uint64_t> GetSubscribeId() const noexcept { return subscribe_id_; }
117+
std::optional<uint64_t> GetRequestId() const noexcept { return request_id_; }
118118

119119
/**
120120
* @brief Get the full track name
@@ -150,12 +150,12 @@ namespace quicr {
150150
ConnectionHandle connection_handle_; // QUIC transport connection ID
151151

152152
/**
153-
* subscribe_id_ is the primary index/key for subscribe subscribe context/delegate storage.
154-
* It is use as the subscribe_id in MoQ related subscribes. Subscribe ID will adapt
155-
* to received subscribe IDs, so the value will reflect either the received subscribe ID
153+
* request_id_ is the primary index/key for subscribe context/delegate storage.
154+
* It is use as the request_id in MoQ related subscribes. Request ID will adapt
155+
* to received reqeust IDs, so the value will reflect either the received reqeust ID
156156
* or the next one that increments from last received ID.
157157
*/
158-
std::optional<uint64_t> subscribe_id_;
158+
std::optional<uint64_t> request_id_;
159159
};
160160

161161
} // namespace moq

include/quicr/detail/ctrl_message_types.h

Lines changed: 90 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,108 @@ namespace quicr::messages {
1414
quicr::Bytes& operator<<(quicr::Bytes& buffer, std::uint8_t value);
1515
quicr::BytesSpan operator>>(quicr::BytesSpan buffer, uint8_t& value);
1616

17+
Bytes& operator<<(Bytes& buffer, std::uint16_t value);
18+
BytesSpan operator>>(BytesSpan buffer, std::uint16_t& value);
19+
1720
quicr::Bytes& operator<<(quicr::Bytes& buffer, const quicr::UintVar& value);
1821

1922
using GroupId = uint64_t;
2023
using ObjectId = uint64_t;
24+
// TODO(RichLogan): Remove when ErrorReason -> ReasonPhrase.
25+
using ReasonPhrase = Bytes;
2126

22-
enum struct ParameterType : uint64_t
27+
struct ControlMessage
2328
{
24-
kPath = 0x1,
25-
kMaxSubscribeId = 0x2, // version specific, unused
26-
kEndpointId = 0xF0, // Endpoint ID, using temp value for now
27-
kInvalid = 0xFF, // used internally.
29+
std::uint64_t type{ 0 };
30+
Bytes payload{};
2831
};
32+
Bytes& operator<<(Bytes& buffer, const ControlMessage& message);
33+
BytesSpan operator>>(BytesSpan buffer, ControlMessage& message);
2934

30-
struct Parameter
35+
struct Location
3136
{
32-
ParameterType type{ 0 };
37+
GroupId group{ 0 };
38+
ObjectId object{ 0 };
39+
};
40+
Bytes& operator<<(Bytes& buffer, const Location& location);
41+
BytesSpan operator>>(BytesSpan buffer, Location& location);
42+
43+
/// MoQ Key Value Pair.
44+
template<typename T>
45+
concept KeyType =
46+
std::same_as<T, std::uint64_t> || (std::is_enum_v<T> && std::same_as<std::underlying_type_t<T>, std::uint64_t>);
47+
template<KeyType T>
48+
struct KeyValuePair
49+
{
50+
T type;
3351
Bytes value;
3452
};
53+
template<KeyType T>
54+
Bytes& operator<<(Bytes& buffer, const KeyValuePair<T>& param)
55+
{
56+
const auto type = static_cast<std::uint64_t>(param.type);
57+
buffer << UintVar(type);
58+
if (type % 2 == 0) {
59+
// Even, single varint of value.
60+
assert(param.value.size() <= 8);
61+
std::uint64_t val = 0;
62+
std::memcpy(&val, param.value.data(), std::min(param.value.size(), sizeof(std::uint64_t)));
63+
buffer << UintVar(val);
64+
} else {
65+
// Odd, encode bytes.
66+
buffer << UintVar(param.value.size());
67+
buffer.insert(buffer.end(), param.value.begin(), param.value.end());
68+
}
69+
return buffer;
70+
}
71+
template<KeyType T>
72+
BytesSpan operator>>(BytesSpan buffer, KeyValuePair<T>& param)
73+
{
74+
std::uint64_t type;
75+
buffer = buffer >> type;
76+
param.type = static_cast<T>(type);
77+
if (type % 2 == 0) {
78+
// Even, single varint of value.
79+
UintVar uvar(buffer);
80+
buffer = buffer.subspan(uvar.size());
81+
std::uint64_t val(uvar);
82+
param.value.resize(uvar.size());
83+
std::memcpy(param.value.data(), &val, uvar.size());
84+
} else {
85+
// Odd, decode bytes.
86+
uint64_t size = 0;
87+
buffer = buffer >> size;
88+
param.value.assign(buffer.begin(), std::next(buffer.begin(), size));
89+
buffer = buffer.subspan(size);
90+
}
91+
return buffer;
92+
}
93+
94+
// Serialization for all uint64_t/enum(uint64_t to varint).
95+
template<KeyType T>
96+
Bytes& operator<<(Bytes& buffer, const T value)
97+
{
98+
buffer << UintVar(static_cast<std::uint64_t>(value));
99+
return buffer;
100+
}
101+
template<KeyType T>
102+
BytesSpan operator>>(BytesSpan buffer, T& value)
103+
{
104+
std::uint64_t uvalue;
105+
buffer = buffer >> uvalue;
106+
value = static_cast<T>(uvalue);
107+
return buffer;
108+
}
35109

36-
Bytes& operator<<(Bytes& buffer, ParameterType value);
37-
BytesSpan operator>>(BytesSpan buffer, ParameterType& value);
110+
enum struct ParameterType : uint64_t
111+
{
112+
kPath = 0x1,
113+
kMaxRequestId = 0x2, // version specific, unused
114+
kEndpointId = 0xF1, // Endpoint ID, using temp value for now
115+
kInvalid = 0xFF, // used internally.
116+
};
117+
118+
using Parameter = KeyValuePair<ParameterType>;
38119

39120
enum struct GroupOrder : uint8_t
40121
{
@@ -55,9 +136,6 @@ namespace quicr::messages {
55136
kAbsoluteRange
56137
};
57138

58-
Bytes& operator<<(Bytes& buffer, FilterType value);
59-
BytesSpan operator>>(BytesSpan buffer, FilterType& value);
60-
61139
enum class TrackStatusCode : uint64_t
62140
{
63141
kInProgress = 0x00,
@@ -67,9 +145,6 @@ namespace quicr::messages {
67145
kUnknown
68146
};
69147

70-
Bytes& operator<<(Bytes& buffer, TrackStatusCode value);
71-
BytesSpan operator>>(BytesSpan buffer, TrackStatusCode& value);
72-
73148
enum class SubscribeDoneStatusCode : uint64_t
74149
{
75150
kInternalError = 0x00,
@@ -81,9 +156,6 @@ namespace quicr::messages {
81156
kTooFarBehind,
82157
};
83158

84-
Bytes& operator<<(Bytes& buffer, SubscribeDoneStatusCode value);
85-
BytesSpan operator>>(BytesSpan buffer, SubscribeDoneStatusCode& value);
86-
87159
enum class FetchType : uint8_t
88160
{
89161
kStandalone = 0x1,
@@ -104,9 +176,6 @@ namespace quicr::messages {
104176
kGoAwayTimeout = 0x10,
105177
};
106178

107-
Bytes& operator<<(Bytes& buffer, TerminationReason value);
108-
BytesSpan operator>>(BytesSpan buffer, TerminationReason& value);
109-
110179
enum class FetchErrorCode : uint8_t
111180
{
112181
kInternalError = 0x0,
@@ -129,9 +198,6 @@ namespace quicr::messages {
129198
kUninterested
130199
};
131200

132-
Bytes& operator<<(Bytes& buffer, AnnounceErrorCode value);
133-
BytesSpan operator>>(BytesSpan buffer, AnnounceErrorCode& value);
134-
135201
// TODO (Suhas): rename it to StreamMapping
136202
enum ForwardingPreference : uint8_t
137203
{
@@ -158,9 +224,6 @@ namespace quicr::messages {
158224
kTrackNotExist = 0xF0 // Missing in draft
159225
};
160226

161-
Bytes& operator<<(Bytes& buffer, SubscribeErrorCode value);
162-
BytesSpan operator>>(BytesSpan buffer, SubscribeErrorCode& value);
163-
164227
enum class SubscribeAnnouncesErrorCode : uint64_t
165228
{
166229
kInternalError = 0x0,
@@ -169,7 +232,4 @@ namespace quicr::messages {
169232
kNotSupported,
170233
kNamespacePrefixUnknown,
171234
};
172-
173-
Bytes& operator<<(Bytes& buffer, SubscribeAnnouncesErrorCode value);
174-
BytesSpan operator>>(BytesSpan buffer, SubscribeAnnouncesErrorCode& value);
175235
} // namespace

0 commit comments

Comments
 (0)