Skip to content

Commit 3577628

Browse files
authored
feat(rumqttd): retained messages (#683)
1 parent b7c3086 commit 3577628

File tree

6 files changed

+91
-38
lines changed

6 files changed

+91
-38
lines changed

rumqttd/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1616
### Removed
1717

1818
### Fixed
19+
- Retained Messages
1920

2021
### Security
2122

rumqttd/src/protocol/v4/subscribe.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ mod filter {
6161
qos: qos(requested_qos).ok_or(Error::InvalidQoS(requested_qos))?,
6262
nolocal: false,
6363
preserve_retain: false,
64-
retain_forward_rule: RetainForwardRule::Never,
64+
retain_forward_rule: RetainForwardRule::OnEverySubscribe,
6565
});
6666
}
6767

rumqttd/src/router/iobufs.rs

+15-14
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct Outgoing {
5959
/// Handle which is given to router to allow router to communicate with this connection
6060
pub(crate) handle: Sender<()>,
6161
/// The buffer to keep track of inflight packets.
62-
inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>,
62+
inflight_buffer: VecDeque<(u16, FilterIdx, Option<Cursor>)>,
6363
/// PubRels waiting for PubComp
6464
pub(crate) unacked_pubrels: VecDeque<u16>,
6565
/// Last packet id
@@ -208,8 +208,9 @@ impl Outgoing {
208208
pub fn retransmission_map(&self) -> HashMap<FilterIdx, Cursor> {
209209
let mut o = HashMap::new();
210210
for (_, filter_idx, cursor) in self.inflight_buffer.iter() {
211-
if !o.contains_key(filter_idx) {
212-
o.insert(*filter_idx, *cursor);
211+
// if cursor in None, it means it was a retained publish
212+
if !o.contains_key(filter_idx) && cursor.is_some() {
213+
o.insert(*filter_idx, cursor.unwrap());
213214
}
214215
}
215216

@@ -232,17 +233,17 @@ mod test {
232233
result.insert(3, (1, 0));
233234

234235
let buf = vec![
235-
(1, 0, (0, 8)),
236-
(1, 0, (0, 10)),
237-
(1, 1, (0, 1)),
238-
(3, 1, (0, 4)),
239-
(2, 2, (1, 1)),
240-
(1, 2, (2, 6)),
241-
(1, 2, (2, 1)),
242-
(1, 3, (1, 0)),
243-
(1, 3, (1, 1)),
244-
(1, 3, (1, 3)),
245-
(1, 3, (1, 3)),
236+
(1, 0, Some((0, 8))),
237+
(1, 0, Some((0, 10))),
238+
(1, 1, Some((0, 1))),
239+
(3, 1, Some((0, 4))),
240+
(2, 2, Some((1, 1))),
241+
(1, 2, Some((2, 6))),
242+
(1, 2, Some((2, 1))),
243+
(1, 3, Some((1, 0))),
244+
(1, 3, Some((1, 1))),
245+
(1, 3, Some((1, 3))),
246+
(1, 3, Some((1, 3))),
246247
];
247248

248249
outgoing.inflight_buffer.extend(buf);

rumqttd/src/router/logs.rs

+32-12
Original file line numberDiff line numberDiff line change
@@ -269,22 +269,42 @@ impl DataLog {
269269
self.retained_publishes.remove(&topic);
270270
}
271271

272-
pub fn handle_retained_messages(
273-
&mut self,
274-
filter: &str,
275-
notifications: &mut VecDeque<(ConnectionId, DataRequest)>,
276-
) {
277-
trace!(info = "retain-msg", filter = &filter);
272+
pub fn read_retained_messages(&mut self, filter: &str) -> Vec<PubWithProp> {
273+
trace!(info = "reading retain msg", filter = &filter);
274+
let now = Instant::now();
275+
276+
// discard expired retained messages
277+
self.retained_publishes.retain(|_, pubdata| {
278+
// Keep data if no properties exists, which implies no message expiry!
279+
let Some(properties) = pubdata.properties.as_mut() else {
280+
return true
281+
};
282+
283+
// Keep data if there is no message_expiry_interval
284+
let Some(message_expiry_interval) = properties.message_expiry_interval.as_mut() else {
285+
return true
286+
};
278287

279-
let idx = self.filter_indexes.get(filter).unwrap();
288+
let time_spent = (now - pubdata.timestamp).as_secs() as u32;
280289

281-
let datalog = self.native.get_mut(*idx).unwrap();
290+
let is_valid = time_spent < *message_expiry_interval;
282291

283-
for (topic, publish) in self.retained_publishes.iter_mut() {
284-
if matches(topic, filter) {
285-
datalog.append(publish.clone(), notifications);
292+
// ignore expired messages
293+
if is_valid {
294+
// set message_expiry_interval to (original value - time spent waiting in server)
295+
// ref: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112
296+
*message_expiry_interval -= time_spent;
286297
}
287-
}
298+
299+
is_valid
300+
});
301+
302+
// no need to include timestamp when returning
303+
self.retained_publishes
304+
.iter()
305+
.filter(|(topic, _)| matches(topic, filter))
306+
.map(|(_, p)| (p.publish.clone(), p.properties.clone()))
307+
.collect()
288308
}
289309
}
290310

rumqttd/src/router/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl From<Notification> for MaybePacket {
110110

111111
#[derive(Debug, Clone)]
112112
pub struct Forward {
113-
pub cursor: (u64, u64),
113+
pub cursor: Option<(u64, u64)>,
114114
pub size: usize,
115115
pub publish: Publish,
116116
pub properties: Option<PublishProperties>,
@@ -192,6 +192,7 @@ pub struct DataRequest {
192192
pub read_count: usize,
193193
/// Maximum count of payload buffer per replica
194194
max_count: usize,
195+
pub(crate) forward_retained: bool,
195196
pub(crate) group: Option<String>,
196197
}
197198

rumqttd/src/router/routing.rs

+40-10
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ impl Router {
541541

542542
for packet in packets.drain(0..) {
543543
match packet {
544-
Packet::Publish(mut publish, properties) => {
544+
Packet::Publish(publish, properties) => {
545545
let span = tracing::error_span!("publish", topic = ?publish.topic, pkid = publish.pkid);
546546
let _guard = span.enter();
547547

@@ -591,11 +591,6 @@ impl Router {
591591

592592
self.router_meters.total_publishes += 1;
593593

594-
// Ignore retained messages
595-
if publish.retain {
596-
publish.retain = false;
597-
}
598-
599594
// Try to append publish to commitlog
600595
match append_to_commitlog(
601596
id,
@@ -678,8 +673,6 @@ impl Router {
678673
// and create DataRequest, while using the same datalog of "topic"
679674
// NOTE: topic & $share/group/topic will have same filteridx!
680675
self.prepare_filter(id, cursor, idx, f, group, subscription_id);
681-
self.datalog
682-
.handle_retained_messages(&filter, &mut self.notifications);
683676

684677
let code = match f.qos {
685678
QoS::AtMostOnce => SubscribeReasonCode::QoS0,
@@ -898,6 +891,7 @@ impl Router {
898891
}
899892

900893
/// Apply filter and prepare this connection to receive subscription data
894+
/// Handle retained messages as per subscription options!
901895
fn prepare_filter(
902896
&mut self,
903897
id: ConnectionId,
@@ -946,6 +940,13 @@ impl Router {
946940
.insert(filter_path.clone(), subscription_id);
947941
}
948942

943+
// check is group is None because retained messages aren't sent
944+
// for shared subscriptions
945+
// TODO: use retain forward rules
946+
let forward_retained = group.is_none();
947+
948+
// call to `insert(_)` returns `true` if it didn't contain the filter_path already
949+
// i.e. its a new subscription
949950
if connection.subscriptions.insert(filter_path.clone()) {
950951
let request = DataRequest {
951952
filter: filter_path.clone(),
@@ -954,6 +955,8 @@ impl Router {
954955
cursor,
955956
read_count: 0,
956957
max_count: 100,
958+
// set true for new subscriptions
959+
forward_retained,
957960
group,
958961
};
959962

@@ -962,6 +965,10 @@ impl Router {
962965
debug_assert!(self.scheduler.check_tracker_duplicates(id).is_none())
963966
}
964967

968+
// TODO: figure out how we can update existing DataRequest
969+
// helpful in re-subscriptions and forwarding retained messages on
970+
// every subscribe
971+
965972
let meter = &mut self.ibufs.get_mut(id).unwrap().meter;
966973
meter.register_subscription(filter_path.clone());
967974
}
@@ -1193,10 +1200,11 @@ fn append_to_commitlog(
11931200
if publish.payload.is_empty() {
11941201
datalog.remove_from_retained_publishes(topic.to_owned());
11951202
} else if publish.retain {
1196-
error!("Unexpected: retain field was not unset");
11971203
datalog.insert_to_retained_publishes(publish.clone(), properties.clone(), topic.to_owned());
11981204
}
11991205

1206+
// after recording retained message, we also send that message to existing subscribers
1207+
// as normal publish message. Therefore we are setting retain to false
12001208
publish.retain = false;
12011209
let pkid = publish.pkid;
12021210

@@ -1358,7 +1366,23 @@ fn forward_device_data(
13581366
inflight_slots = 1;
13591367
}
13601368

1361-
let (next, publishes) =
1369+
let mut publishes = Vec::new();
1370+
1371+
if request.forward_retained {
1372+
// NOTE: ideally we want to limit the number of read messages
1373+
// and skip the messages previously read while reading next time.
1374+
// but for now, we just try to read all messages and drop the excess ones
1375+
let mut retained_publishes = datalog.read_retained_messages(&request.filter);
1376+
retained_publishes.truncate(inflight_slots as usize);
1377+
1378+
publishes.extend(retained_publishes.into_iter().map(|p| (p, None)));
1379+
inflight_slots -= publishes.len() as u64;
1380+
1381+
// we only want to forward retained messages once
1382+
request.forward_retained = false;
1383+
}
1384+
1385+
let (next, publishes_from_datalog) =
13621386
match datalog.native_readv(request.filter_idx, request.cursor, inflight_slots) {
13631387
Ok(v) => v,
13641388
Err(e) => {
@@ -1367,6 +1391,12 @@ fn forward_device_data(
13671391
}
13681392
};
13691393

1394+
publishes.extend(
1395+
publishes_from_datalog
1396+
.into_iter()
1397+
.map(|(p, offset)| (p, Some(offset))),
1398+
);
1399+
13701400
let (start, next, caughtup) = match next {
13711401
Position::Next { start, end } => (start, end, false),
13721402
Position::Done { start, end } => (start, end, true),

0 commit comments

Comments
 (0)