Skip to content

Commit 8790b6e

Browse files
committed
refactor: update log handling to support UTF-8 text and enhance related tests
1 parent 866fc93 commit 8790b6e

File tree

8 files changed

+270
-64
lines changed

8 files changed

+270
-64
lines changed

core/src/ten_manager/src/designer/log_watcher/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub struct SetAppBaseDir {
3232

3333
#[derive(Message, Debug, Serialize, Deserialize)]
3434
#[rtype(result = "()")]
35-
pub struct FileContent(pub Vec<u8>);
35+
pub struct FileContent(pub String);
3636

3737
#[derive(Message, Debug, Serialize, Deserialize)]
3838
#[rtype(result = "()")]
@@ -160,8 +160,8 @@ impl Handler<FileContent> for WsLogWatcher {
160160
msg: FileContent,
161161
ctx: &mut Self::Context,
162162
) -> Self::Result {
163-
// Send the file content to the WebSocket client.
164-
ctx.binary(msg.0);
163+
// Send the file content as text to the WebSocket client.
164+
ctx.text(msg.0);
165165
}
166166
}
167167

core/src/ten_manager/src/fs/file_watcher.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); // 1 minute timeout.
2323
const DEFAULT_BUFFER_SIZE: usize = 4096; // Default read buffer size.
2424
const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_millis(100);
2525

26-
/// Stream of file content changes.
26+
/// Stream of UTF-8 text file content changes.
2727
pub struct FileContentStream {
28-
// Channel for receiving file content.
29-
content_rx: Receiver<Result<Vec<u8>>>,
28+
// Channel for receiving file content as UTF-8 text.
29+
content_rx: Receiver<Result<String>>,
3030

3131
// Sender to signal stop request.
3232
stop_tx: Option<oneshot::Sender<()>>,
@@ -35,14 +35,14 @@ pub struct FileContentStream {
3535
impl FileContentStream {
3636
/// Create a new FileContentStream.
3737
fn new(
38-
content_rx: Receiver<Result<Vec<u8>>>,
38+
content_rx: Receiver<Result<String>>,
3939
stop_tx: oneshot::Sender<()>,
4040
) -> Self {
4141
Self { content_rx, stop_tx: Some(stop_tx) }
4242
}
4343

44-
/// Get the next chunk of data from the file.
45-
pub async fn next(&mut self) -> Option<Result<Vec<u8>>> {
44+
/// Get the next chunk of text from the file.
45+
pub async fn next(&mut self) -> Option<Result<String>> {
4646
self.content_rx.recv().await
4747
}
4848

@@ -97,7 +97,7 @@ fn is_same_file(a: &Metadata, b: &Metadata) -> bool {
9797
}
9898
}
9999

100-
/// Watch a file for changes and stream its content.
100+
/// Watch a UTF-8 text file for changes and stream its content.
101101
///
102102
/// Returns a FileContentStream that can be used to read the content of the file
103103
/// as it changes. The stream will end when either:
@@ -132,7 +132,7 @@ pub async fn watch_file<P: AsRef<Path>>(
132132
/// Actual file watch task running in the background.
133133
async fn watch_file_task(
134134
path: PathBuf,
135-
content_tx: Sender<Result<Vec<u8>>>,
135+
content_tx: Sender<Result<String>>,
136136
mut stop_rx: oneshot::Receiver<()>,
137137
options: FileWatchOptions,
138138
) {
@@ -157,15 +157,18 @@ async fn watch_file_task(
157157
let _ = content_tx.send(Err(anyhow!(e))).await;
158158
return;
159159
}
160-
let mut init_buf = Vec::new();
161-
match file.read_to_end(&mut init_buf) {
160+
161+
let mut init_content = String::new();
162+
match file.read_to_string(&mut init_content) {
162163
Ok(n) => {
163164
if n > 0 {
164-
let _ = content_tx.send(Ok(init_buf)).await;
165+
let _ = content_tx.send(Ok(init_content)).await;
165166
}
166167
}
167168
Err(e) => {
168-
let _ = content_tx.send(Err(anyhow!(e))).await;
169+
let _ = content_tx
170+
.send(Err(anyhow!("Failed to read file as UTF-8 text: {}", e)))
171+
.await;
169172
return;
170173
}
171174
}
@@ -226,17 +229,25 @@ async fn watch_file_task(
226229
}
227230

228231
let mut reader = BufReader::with_capacity(options.buffer_size, &file);
229-
let mut buf = Vec::with_capacity(options.buffer_size);
230-
match reader.read_until(0, &mut buf) {
232+
let mut line = String::with_capacity(options.buffer_size);
233+
match reader.read_line(&mut line) {
231234
Ok(n) if n > 0 => {
232235
last_pos += n as u64;
233-
let _ = content_tx.send(Ok(buf)).await;
236+
let _ = content_tx.send(Ok(line)).await;
234237
last_activity = Instant::now();
235238
}
236239
Ok(_) => {}
237240
Err(e) => {
238-
eprintln!("Error reading from file: {}", e);
239-
let _ = content_tx.send(Err(anyhow::anyhow!(e))).await;
241+
// Specific error for UTF-8 decoding failures.
242+
if e.kind() == std::io::ErrorKind::InvalidData {
243+
eprintln!("Error: Invalid UTF-8 data in file");
244+
let _ = content_tx
245+
.send(Err(anyhow!("Invalid UTF-8 data in file")))
246+
.await;
247+
} else {
248+
eprintln!("Error reading from file: {}", e);
249+
let _ = content_tx.send(Err(anyhow::anyhow!(e))).await;
250+
}
240251
break;
241252
}
242253
}

core/src/ten_manager/src/log/mod.rs

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,100 @@
44
// Licensed under the Apache License, Version 2.0, with certain conditions.
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
7-
use anyhow::Result;
7+
use anyhow::{anyhow, Result};
8+
use serde_json::Value;
89
use std::collections::HashMap;
910

10-
pub struct ExtensionInfo {
11-
pub thread_id: u64,
11+
pub struct ExtensionThreadInfo {
12+
pub extensions: Vec<String>,
13+
}
14+
15+
pub struct AppInfo {
16+
pub extension_threads: HashMap<String, ExtensionThreadInfo>,
1217
}
1318

1419
pub struct GraphResourcesLog {
15-
pub app_uri: Option<String>,
1620
pub graph_id: String,
1721
pub graph_name: String,
18-
pub extensions: HashMap<String, ExtensionInfo>,
22+
pub apps: HashMap<Option<String>, AppInfo>,
1923
}
2024

2125
pub fn parse_graph_resources_log(
2226
log_message: &str,
2327
graph_resources_log: &mut GraphResourcesLog,
2428
) -> Result<()> {
29+
// Check if the log level is 'M'.
30+
let parts: Vec<&str> = log_message.split_whitespace().collect();
31+
if parts.len() < 4 {
32+
return Ok(());
33+
}
34+
35+
// Check for log level 'M' - it should be in the fourth position after the
36+
// timestamp and process/thread IDs.
37+
let log_level_pos = 3;
38+
if parts.len() <= log_level_pos || parts[log_level_pos] != "M" {
39+
return Ok(());
40+
}
41+
42+
// Find the "[graph resources]" marker.
43+
if !log_message.contains("[graph resources]") {
44+
return Ok(());
45+
}
46+
47+
// Extract the JSON content after "[graph resources]".
48+
let json_content = log_message
49+
.split("[graph resources]")
50+
.nth(1)
51+
.ok_or_else(|| anyhow!("Failed to extract JSON content"))?
52+
.trim();
53+
54+
// Parse the JSON content.
55+
let json_value: Value = serde_json::from_str(json_content)?;
56+
57+
// Extract data from the JSON.
58+
let app_uri = json_value.get("app_uri").and_then(|v| v.as_str());
59+
let graph_id = json_value["graph id"]
60+
.as_str()
61+
.ok_or_else(|| anyhow!("Missing graph id"))?;
62+
let graph_name = json_value["graph name"]
63+
.as_str()
64+
.ok_or_else(|| anyhow!("Missing graph name"))?;
65+
66+
// Update graph_resources_log with graph ID and name.
67+
graph_resources_log.graph_id = graph_id.to_string();
68+
graph_resources_log.graph_name = graph_name.to_string();
69+
70+
// Create or get the AppInfo for this app_uri.
71+
let app_key = app_uri.map(|uri| uri.to_string());
72+
let app_info = graph_resources_log
73+
.apps
74+
.entry(app_key)
75+
.or_insert_with(|| AppInfo { extension_threads: HashMap::new() });
76+
77+
// Process extension_threads if present.
78+
if let Some(extension_threads) = json_value.get("extension_threads") {
79+
if let Some(extension_threads_obj) = extension_threads.as_object() {
80+
for (thread_id, thread_info) in extension_threads_obj {
81+
if let Some(extensions_array) = thread_info.get("extensions") {
82+
if let Some(extensions) = extensions_array.as_array() {
83+
let mut extension_names = Vec::new();
84+
for ext in extensions {
85+
if let Some(ext_name) = ext.as_str() {
86+
extension_names.push(ext_name.to_string());
87+
}
88+
}
89+
90+
let thread_info =
91+
ExtensionThreadInfo { extensions: extension_names };
92+
93+
app_info
94+
.extension_threads
95+
.insert(thread_id.to_string(), thread_info);
96+
}
97+
}
98+
}
99+
}
100+
}
101+
25102
Ok(())
26103
}

core/src/ten_manager/tests/test_case/designer/log_watcher.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,10 @@ async fn test_ws_log_watcher_endpoint() {
9393
let mut received_content = false;
9494
while let Some(msg) = read.next().await {
9595
let msg = msg.unwrap();
96-
if msg.is_binary() {
97-
let binary = msg.into_data();
98-
let content = String::from_utf8_lossy(&binary);
99-
println!("Received binary: {}", content);
100-
if content.contains(test_content) {
96+
if msg.is_text() {
97+
let text = msg.to_text().unwrap();
98+
println!("Received text: {}", text);
99+
if text.contains(test_content) {
101100
received_content = true;
102101
break;
103102
}

core/src/ten_manager/tests/test_case/fs/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ mod tests {
2323
rt.block_on(async {
2424
// Create a temporary file for testing
2525
let mut temp_file = NamedTempFile::new()?;
26-
let test_content = b"Hello, World!";
27-
temp_file.write_all(test_content)?;
26+
let test_content = "Hello, World!";
27+
temp_file.write_all(test_content.as_bytes())?;
2828
temp_file.flush()?;
2929

3030
// Create options with shorter timeout for testing.
@@ -43,8 +43,8 @@ mod tests {
4343
assert_eq!(chunk, test_content);
4444

4545
// Write more content to the file.
46-
let more_content = b"More content!";
47-
temp_file.write_all(more_content)?;
46+
let more_content = "More content!";
47+
temp_file.write_all(more_content.as_bytes())?;
4848
temp_file.flush()?;
4949

5050
// Get the second chunk.
@@ -82,7 +82,7 @@ mod tests {
8282
.create(true)
8383
.truncate(true)
8484
.open(&path_str)?;
85-
file.write_all(b"Initial content")?;
85+
file.write_all("Initial content".as_bytes())?;
8686
file.flush()?;
8787
}
8888

@@ -98,7 +98,7 @@ mod tests {
9898

9999
// Get the first chunk
100100
let chunk = stream.next().await.expect("Should receive data")?;
101-
assert_eq!(chunk, b"Initial content");
101+
assert_eq!(chunk, "Initial content");
102102

103103
// Simulate log rotation - delete and recreate file
104104
std::fs::remove_file(&path_str)?;
@@ -112,14 +112,14 @@ mod tests {
112112
.create(true)
113113
.truncate(true)
114114
.open(&path_str)?;
115-
file.write_all(b"Rotated content")?;
115+
file.write_all("Rotated content".as_bytes())?;
116116
file.flush()?;
117117
}
118118

119119
// Get the content after rotation
120120
let chunk =
121121
stream.next().await.expect("Should receive rotated data")?;
122-
assert_eq!(chunk, b"Rotated content");
122+
assert_eq!(chunk, "Rotated content");
123123

124124
// Stop watching
125125
stream.stop();
@@ -134,7 +134,7 @@ mod tests {
134134
rt.block_on(async {
135135
// Create a temporary file for testing.
136136
let mut temp_file = NamedTempFile::new()?;
137-
temp_file.write_all(b"Test content")?;
137+
temp_file.write_all("Test content".as_bytes())?;
138138
temp_file.flush()?;
139139

140140
// Create options with very short timeout for testing.
@@ -150,7 +150,7 @@ mod tests {
150150

151151
// Get the first chunk.
152152
let chunk = stream.next().await.expect("Should receive data")?;
153-
assert_eq!(chunk, b"Test content");
153+
assert_eq!(chunk, "Test content");
154154

155155
// Wait for the timeout to occur (no new content being written).
156156
let next_result = stream.next().await;

0 commit comments

Comments
 (0)