1
1
use std:: ffi:: OsString ;
2
2
use std:: sync:: { Arc , Mutex } ;
3
3
4
- use anyhow:: Context ;
4
+ use thiserror:: Error ;
5
+ use tokio:: sync:: broadcast:: error:: RecvError ;
5
6
6
7
pub mod types;
7
8
pub mod state;
@@ -35,7 +36,7 @@ impl RosMonitor {
35
36
let channel_arc_ = channel_arc. clone ( ) ;
36
37
37
38
let task = AbortJoinHandle ( tokio:: spawn ( async move {
38
- let error: Result < ( ) , anyhow :: Error > = async {
39
+ let error: Result < ( ) , RosMonitorError > = async {
39
40
let channel_ = {
40
41
let channel = channel_arc_. lock ( ) . unwrap ( ) ;
41
42
channel. as_ref ( ) . unwrap ( ) . clone ( )
@@ -53,7 +54,7 @@ impl RosMonitor {
53
54
. stderr ( Stdio :: piped ( ) )
54
55
. spawn ( ) ?;
55
56
56
- let stdout = child. stdout . take ( ) . context ( "unable to capture stdout" ) ?;
57
+ let stdout = child. stdout . take ( ) . ok_or ( RosMonitorError :: PipeError ) ?;
57
58
let mut reader = tokio:: io:: BufReader :: new ( stdout) ;
58
59
59
60
loop {
@@ -86,11 +87,14 @@ impl RosMonitor {
86
87
87
88
let elapsed = started_at. elapsed ( ) ;
88
89
if elapsed. as_millis ( ) < 2000 {
89
- let mut stderr = child. stderr . take ( ) . context ( "unable to capture stderr" ) ?;
90
+ let mut stderr = child. stderr . take ( ) . ok_or ( RosMonitorError :: PipeError ) ?;
90
91
let mut result = String :: new ( ) ;
91
92
stderr. read_to_string ( & mut result) . await ?;
92
93
let status = child. try_wait ( ) ?;
93
- Err ( anyhow:: anyhow!( "process exited: {}\n {}" , status. unwrap_or_default( ) , result) ) ?;
94
+ Err ( RosMonitorError :: ProcessExited {
95
+ status : status. unwrap_or_default ( ) ,
96
+ stderr : result,
97
+ } ) ?;
94
98
}
95
99
}
96
100
} . await ;
@@ -113,9 +117,9 @@ impl RosMonitor {
113
117
}
114
118
}
115
119
116
- pub fn subscribe ( & self ) -> anyhow :: Result < impl futures:: TryStream < Item = anyhow :: Result < types:: DiscoveryEvent > > > {
120
+ pub fn subscribe ( & self ) -> Result < impl futures:: TryStream < Item = Result < types:: DiscoveryEvent , RecvError > > , RecvError > {
117
121
if self . task . 0 . is_finished ( ) {
118
- return Err ( anyhow :: anyhow! ( "monitor is not running" ) ) ;
122
+ return Err ( RecvError :: Closed ) ;
119
123
}
120
124
121
125
let ( initial, receiver) = {
@@ -128,7 +132,7 @@ impl RosMonitor {
128
132
} ;
129
133
130
134
Ok ( async_stream:: try_stream! {
131
- let mut receiver = receiver. context ( "channel closed" ) ?;
135
+ let mut receiver = receiver. ok_or ( RecvError :: Closed ) ?;
132
136
for event in initial {
133
137
yield event;
134
138
}
@@ -139,3 +143,16 @@ impl RosMonitor {
139
143
} )
140
144
}
141
145
}
146
+
147
+ #[ derive( Debug , Error ) ]
148
+ enum RosMonitorError {
149
+ #[ error( "unable to spawn process: {0}" ) ]
150
+ SpawnError ( #[ from] tokio:: io:: Error ) ,
151
+ #[ error( "unable to read stdout/stderr" ) ]
152
+ PipeError ,
153
+ #[ error( "process exited: {status}\n {stderr}" ) ]
154
+ ProcessExited {
155
+ status : std:: process:: ExitStatus ,
156
+ stderr : String ,
157
+ } ,
158
+ }
0 commit comments