Skip to content

Commit 38d9f13

Browse files
krussDmitryAstafyev
authored andcommitted
Added robustness on reading DLT messages
1 parent 4e04ba6 commit 38d9f13

File tree

9 files changed

+312
-102
lines changed

9 files changed

+312
-102
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.20.1] - 2025-05-09
11+
12+
### Changed
13+
14+
- Added robustness for DLT message reader and stream
15+
1016
## [0.20.0] - 2025-02-25
1117

1218
### Added

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "dlt-core"
3-
version = "0.20.0"
3+
version = "0.20.1"
44
authors = ["esrlabs.com"]
55
edition = "2021"
66
description = """

README.md

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ The following example can be run with `cargo run --example file_parser --release
6969

7070
<!-- example start -->
7171
```rust
72-
use dlt_core::read::{read_message, DltMessageReader};
72+
use dlt_core::{
73+
parse::DltParseError,
74+
read::{read_message, DltMessageReader},
75+
};
7376
use std::{env, fs, fs::File, path::PathBuf, time::Instant};
7477

7578
fn main() {
@@ -81,8 +84,21 @@ fn main() {
8184
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
8285
let mut message_count = 0usize;
8386
let start = Instant::now();
84-
while let Some(_message) = read_message(&mut dlt_reader, None).expect("read dlt message") {
85-
message_count += 1;
87+
loop {
88+
match read_message(&mut dlt_reader, None) {
89+
Ok(Some(_)) => {
90+
message_count += 1;
91+
}
92+
Ok(None) => {
93+
break;
94+
}
95+
Err(error) => match error {
96+
DltParseError::ParsingHickup(_) => {
97+
continue;
98+
}
99+
_ => panic!("{}", error),
100+
},
101+
}
86102
}
87103
// print some stats
88104
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;

examples/file_parser.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use dlt_core::read::{read_message, DltMessageReader};
1+
use dlt_core::{
2+
parse::DltParseError,
3+
read::{read_message, DltMessageReader},
4+
};
25
use std::{env, fs, fs::File, path::PathBuf, time::Instant};
36

47
fn main() {
@@ -10,8 +13,21 @@ fn main() {
1013
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
1114
let mut message_count = 0usize;
1215
let start = Instant::now();
13-
while let Some(_message) = read_message(&mut dlt_reader, None).expect("read dlt message") {
14-
message_count += 1;
16+
loop {
17+
match read_message(&mut dlt_reader, None) {
18+
Ok(Some(_)) => {
19+
message_count += 1;
20+
}
21+
Ok(None) => {
22+
break;
23+
}
24+
Err(error) => match error {
25+
DltParseError::ParsingHickup(_) => {
26+
continue;
27+
}
28+
_ => panic!("{}", error),
29+
},
30+
}
1531
}
1632
// print some stats
1733
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;

examples/file_streamer.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use dlt_core::stream::{read_message, DltStreamReader};
1+
use dlt_core::{
2+
parse::DltParseError,
3+
stream::{read_message, DltStreamReader},
4+
};
25
use std::{env, fs, path::PathBuf, time::Instant};
36
use tokio::fs::File;
47
use tokio_util::compat::TokioAsyncReadCompatExt;
@@ -13,11 +16,21 @@ async fn main() {
1316
let mut dlt_reader = DltStreamReader::new(dlt_file.compat(), true);
1417
let mut message_count = 0usize;
1518
let start = Instant::now();
16-
while let Some(_message) = read_message(&mut dlt_reader, None)
17-
.await
18-
.expect("read dlt message")
19-
{
20-
message_count += 1;
19+
loop {
20+
match read_message(&mut dlt_reader, None).await {
21+
Ok(Some(_)) => {
22+
message_count += 1;
23+
}
24+
Ok(None) => {
25+
break;
26+
}
27+
Err(error) => match error {
28+
DltParseError::ParsingHickup(_) => {
29+
continue;
30+
}
31+
_ => panic!("{}", error),
32+
},
33+
}
2134
}
2235
// print some stats
2336
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;

src/dlt.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,8 @@ impl TryFrom<u32> for TypeInfo {
901901
///
902902
/// * phy_v is what we received in the dlt message
903903
/// * log_v is the real value
904-
/// example: the degree celcius is transmitted,
904+
///
905+
/// Example: the degree celcius is transmitted,
905906
/// quantization = 0.01, offset = -50
906907
/// now the transmitted value phy_v = (log_v - offset)/quantization = 7785
907908
///

src/read.rs

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use crate::{
1717
dlt::{HEADER_MIN_LENGTH, STORAGE_HEADER_LENGTH},
1818
filtering::ProcessedDltFilterConfig,
19-
parse::{dlt_message, parse_length, DltParseError, ParsedMessage},
19+
parse::{dlt_message, parse_length, DltParseError, ParsedMessage, DLT_PATTERN},
2020
};
2121
use std::io::{BufReader, Read};
2222

@@ -69,7 +69,7 @@ impl<S: Read> DltMessageReader<S> {
6969
source: S,
7070
with_storage_header: bool,
7171
) -> Self {
72-
debug_assert!(buffer_capacity >= message_max_len);
72+
assert!(buffer_capacity >= message_max_len);
7373

7474
DltMessageReader {
7575
source: BufReader::with_capacity(buffer_capacity, source),
@@ -81,30 +81,51 @@ impl<S: Read> DltMessageReader<S> {
8181
/// Read the next message slice from the source,
8282
/// or return an empty slice if no more message could be read.
8383
pub fn next_message_slice(&mut self) -> Result<&[u8], DltParseError> {
84-
let storage_len = if self.with_storage_header {
85-
STORAGE_HEADER_LENGTH as usize
86-
} else {
87-
0
88-
};
89-
let header_len = storage_len + HEADER_MIN_LENGTH as usize;
90-
debug_assert!(header_len <= self.buffer.len());
91-
92-
if self
93-
.source
94-
.read_exact(&mut self.buffer[..header_len])
95-
.is_err()
96-
{
97-
return Ok(&[]);
98-
}
84+
loop {
85+
let storage_len = if self.with_storage_header {
86+
let storage_len = STORAGE_HEADER_LENGTH as usize;
87+
88+
loop {
89+
if self
90+
.source
91+
.read_exact(&mut self.buffer[..storage_len])
92+
.is_err()
93+
{
94+
return Ok(&[]);
95+
}
96+
97+
if &self.buffer[..DLT_PATTERN.len()] == DLT_PATTERN {
98+
break;
99+
}
100+
}
101+
102+
storage_len
103+
} else {
104+
0
105+
};
106+
107+
let header_len = storage_len + HEADER_MIN_LENGTH as usize;
108+
109+
if self
110+
.source
111+
.read_exact(&mut self.buffer[storage_len..header_len])
112+
.is_err()
113+
{
114+
return Ok(&[]);
115+
}
116+
117+
let (_, message_len) = parse_length(&self.buffer[storage_len..header_len])?;
99118

100-
let (_, message_len) = parse_length(&self.buffer[storage_len..header_len])?;
101-
let total_len = storage_len + message_len as usize;
102-
debug_assert!(total_len <= self.buffer.len());
119+
let total_len = storage_len + message_len as usize;
120+
if total_len < header_len {
121+
continue;
122+
}
103123

104-
self.source
105-
.read_exact(&mut self.buffer[header_len..total_len])?;
124+
self.source
125+
.read_exact(&mut self.buffer[header_len..total_len])?;
106126

107-
Ok(&self.buffer[..total_len])
127+
return Ok(&self.buffer[..total_len]);
128+
}
108129
}
109130

110131
/// Answer if message slices contain a `StorageHeader´.
@@ -163,15 +184,44 @@ mod tests {
163184
assert_eq!(bytes, message.as_bytes());
164185
}
165186

166-
assert_eq!(None, read_message(&mut reader, None).expect("message"))
187+
assert_eq!(None, read_message(&mut reader, None).expect("message"));
167188
}
168189
}
169190

191+
#[test]
192+
fn test_read_message_robustness() {
193+
#[rustfmt::skip]
194+
let bytes = [
195+
[
196+
// --------------- storage header with invalid dlt-pattern
197+
0xFF, 0x4C, 0x54, 0x01, 0x2B, 0x2C, 0xC9, 0x4D,
198+
0x7A, 0xE8, 0x01, 0x00, 0x45, 0x43, 0x55, 0x00,
199+
]
200+
.to_vec(),
201+
[
202+
// --------------- storage header
203+
0x44, 0x4C, 0x54, 0x01, 0x2B, 0x2C, 0xC9, 0x4D,
204+
0x7A, 0xE8, 0x01, 0x00, 0x45, 0x43, 0x55, 0x00,
205+
// --------------- standard header with invalid length
206+
0x21, 0x0A, 0x00, 0x00,
207+
]
208+
.to_vec(),
209+
DLT_MESSAGE_WITH_STORAGE_HEADER.to_vec(),
210+
]
211+
.concat();
212+
213+
let mut reader = DltMessageReader::new(bytes.as_slice(), true);
214+
215+
assert!(read_message(&mut reader, None).expect("message").is_some());
216+
assert!(read_message(&mut reader, None).expect("message").is_none());
217+
}
218+
170219
proptest! {
171220
#[test]
172221
fn test_read_messages_proptest(messages in messages_strat(10)) {
173222
test_read_messages(messages, false);
174223
}
224+
175225
#[test]
176226
fn test_read_messages_with_storage_header_proptest(messages in messages_with_storage_header_strat(10)) {
177227
test_read_messages(messages, true);

0 commit comments

Comments
 (0)