Skip to content

Commit 59b537b

Browse files
authored
deserialize frame lazily (#1856)
* deserialize frame lazily * remove useless rewind
1 parent 0e0c5d9 commit 59b537b

File tree

6 files changed

+38
-37
lines changed

6 files changed

+38
-37
lines changed

commons/zenoh-codec/src/network/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ mod request;
1919
mod response;
2020

2121
use zenoh_buffers::{
22-
reader::{DidntRead, Reader},
22+
reader::{BacktrackableReader, DidntRead, Reader},
2323
writer::{DidntWrite, Writer},
2424
};
2525
use zenoh_protocol::{
@@ -104,6 +104,26 @@ where
104104
}
105105
}
106106

107+
pub struct NetworkMessageIter<R> {
108+
codec: Zenoh080Reliability,
109+
reader: R,
110+
}
111+
112+
impl<R> NetworkMessageIter<R> {
113+
pub fn new(reliability: Reliability, reader: R) -> Self {
114+
let codec = Zenoh080Reliability::new(reliability);
115+
Self { codec, reader }
116+
}
117+
}
118+
119+
impl<R: BacktrackableReader> Iterator for NetworkMessageIter<R> {
120+
type Item = NetworkMessage;
121+
122+
fn next(&mut self) -> Option<Self::Item> {
123+
self.codec.read(&mut self.reader).ok()
124+
}
125+
}
126+
107127
// Extensions: QoS
108128
impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
109129
where

commons/zenoh-codec/src/transport/frame.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,20 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <[email protected]>
1313
//
14-
use alloc::vec::Vec;
15-
1614
use zenoh_buffers::{
1715
reader::{BacktrackableReader, DidntRead, Reader},
1816
writer::{DidntWrite, Writer},
1917
};
2018
use zenoh_protocol::{
2119
common::{iext, imsg},
2220
core::Reliability,
23-
network::NetworkMessage,
2421
transport::{
2522
frame::{ext, flag, Frame, FrameHeader},
2623
id, TransportSn,
2724
},
2825
};
2926

30-
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Reliability};
27+
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
3128

3229
// FrameHeader
3330
impl<W> WCodec<&FrameHeader, &mut W> for Zenoh080
@@ -146,9 +143,7 @@ where
146143
self.write(&mut *writer, &header)?;
147144

148145
// Body
149-
for m in payload.iter() {
150-
self.write(&mut *writer, m)?;
151-
}
146+
writer.write_zslice(payload)?;
152147

153148
Ok(())
154149
}
@@ -175,20 +170,7 @@ where
175170

176171
fn read(self, reader: &mut R) -> Result<Frame, Self::Error> {
177172
let header: FrameHeader = self.read(&mut *reader)?;
178-
179-
let rcode = Zenoh080Reliability::new(header.reliability);
180-
let mut payload = Vec::new();
181-
while reader.can_read() {
182-
let mark = reader.mark();
183-
let res: Result<NetworkMessage, DidntRead> = rcode.read(&mut *reader);
184-
match res {
185-
Ok(m) => payload.push(m),
186-
Err(_) => {
187-
reader.rewind(mark);
188-
break;
189-
}
190-
}
191-
}
173+
let payload = reader.read_zslice(reader.remaining())?;
192174

193175
Ok(Frame {
194176
reliability: header.reliability,

commons/zenoh-protocol/src/transport/frame.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <[email protected]>
1313
//
14-
use alloc::vec::Vec;
14+
use zenoh_buffers::ZSlice;
1515

16-
use crate::{core::Reliability, network::NetworkMessage, transport::TransportSn};
16+
use crate::{core::Reliability, transport::TransportSn};
1717

1818
pub mod flag {
1919
pub const R: u8 = 1 << 5; // 0x20 Reliable if R==1 then the frame is reliable
@@ -72,7 +72,7 @@ pub struct Frame {
7272
pub reliability: Reliability,
7373
pub sn: TransportSn,
7474
pub ext_qos: ext::QoSType,
75-
pub payload: Vec<NetworkMessage>,
75+
pub payload: ZSlice,
7676
}
7777

7878
// Extensions
@@ -93,12 +93,7 @@ impl Frame {
9393
let reliability = Reliability::rand();
9494
let sn: TransportSn = rng.gen();
9595
let ext_qos = ext::QoSType::rand();
96-
let mut payload = vec![];
97-
for _ in 0..rng.gen_range(1..4) {
98-
let mut m = NetworkMessage::rand();
99-
m.reliability = reliability;
100-
payload.push(m);
101-
}
96+
let payload = ZSlice::rand(rng.gen_range(8..128));
10297

10398
Frame {
10499
reliability,

io/zenoh-transport/src/common/pipeline.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -993,10 +993,10 @@ mod tests {
993993
reader::{DidntRead, HasReader},
994994
ZBuf,
995995
};
996-
use zenoh_codec::{RCodec, Zenoh080};
996+
use zenoh_codec::{network::NetworkMessageIter, RCodec, Zenoh080};
997997
use zenoh_config::{QueueAllocConf, QueueAllocMode};
998998
use zenoh_protocol::{
999-
core::{Bits, CongestionControl, Encoding, Priority},
999+
core::{Bits, CongestionControl, Encoding, Priority, Reliability},
10001000
network::{ext, Push},
10011001
transport::{BatchSize, Fragment, Frame, TransportBody, TransportSn},
10021002
zenoh::{PushBody, Put},
@@ -1100,8 +1100,10 @@ mod tests {
11001100
match res {
11011101
Ok(msg) => {
11021102
match msg.body {
1103-
TransportBody::Frame(Frame { payload, .. }) => {
1104-
msgs += payload.len()
1103+
TransportBody::Frame(Frame { mut payload, .. }) => {
1104+
msgs +=
1105+
NetworkMessageIter::new(Reliability::DEFAULT, &mut payload)
1106+
.count();
11051107
}
11061108
TransportBody::Fragment(Fragment { more, .. }) => {
11071109
fragments += 1;

io/zenoh-transport/src/multicast/rx.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//
1414
use std::sync::MutexGuard;
1515

16+
use zenoh_codec::network::NetworkMessageIter;
1617
use zenoh_core::{zlock, zread};
1718
use zenoh_protocol::{
1819
core::{Locator, Priority, Reliability},
@@ -170,7 +171,7 @@ impl TransportMulticastInner {
170171
// Drop invalid message and continue
171172
return Ok(());
172173
}
173-
for msg in payload.drain(..) {
174+
for msg in NetworkMessageIter::new(reliability, &mut payload) {
174175
self.trigger_callback(msg, peer)?;
175176
}
176177

io/zenoh-transport/src/unicast/universal/rx.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//
1414
use std::sync::MutexGuard;
1515

16+
use zenoh_codec::network::NetworkMessageIter;
1617
use zenoh_core::{zlock, zread};
1718
use zenoh_link::Link;
1819
use zenoh_protocol::{
@@ -103,7 +104,7 @@ impl TransportUnicastUniversal {
103104
}
104105
let callback = zread!(self.callback).clone();
105106
if let Some(callback) = callback.as_ref() {
106-
for msg in payload.drain(..) {
107+
for msg in NetworkMessageIter::new(reliability, &mut payload) {
107108
self.trigger_callback(callback.as_ref(), msg)?;
108109
}
109110
} else {

0 commit comments

Comments
 (0)