@@ -144,7 +144,7 @@ impl TransportUnicastLowlatency {
144
144
let token = self . token . child_token ( ) ;
145
145
146
146
let c_transport = self . clone ( ) ;
147
- let task = async move {
147
+ let rx_task = async move {
148
148
let guard = zasyncread ! ( c_transport. link) ;
149
149
let link_rx = guard. as_ref ( ) . unwrap ( ) . rx ( ) ;
150
150
drop ( guard) ;
@@ -165,7 +165,7 @@ impl TransportUnicastLowlatency {
165
165
zenoh_sync:: RecyclingObjectPool :: new ( n, move || vec ! [ 0_u8 ; mtu] . into_boxed_slice ( ) )
166
166
} ;
167
167
168
- let res = loop {
168
+ loop {
169
169
// Retrieve one buffer
170
170
let mut buffer = pool. try_take ( ) . unwrap_or_else ( || pool. alloc ( ) ) ;
171
171
@@ -188,25 +188,29 @@ impl TransportUnicastLowlatency {
188
188
break ZResult :: Ok ( ( ) ) ;
189
189
}
190
190
}
191
- } ;
191
+ }
192
+ } ;
192
193
193
- tracing:: debug!(
194
- "[{}] Rx task finished with result {:?}" ,
195
- c_transport. manager. config. zid,
196
- res
197
- ) ;
198
- if res. is_err ( ) {
194
+ let c_transport = self . clone ( ) ;
195
+ self . tracker . spawn_on (
196
+ async move {
197
+ let res = rx_task. await ;
199
198
tracing:: debug!(
200
- "[{}] <on rx exit> finalizing transport with peer: { }" ,
199
+ "[{}] Rx task finished with result {:? }" ,
201
200
c_transport. manager. config. zid,
202
- c_transport . config . zid
201
+ res
203
202
) ;
204
- let _ = c_transport. finalize ( 0 ) . await ;
205
- }
206
- ZResult :: Ok ( ( ) )
207
- } ;
208
-
209
- self . tracker . spawn_on ( task, & ZRuntime :: TX ) ;
203
+ if res. is_err ( ) {
204
+ tracing:: debug!(
205
+ "[{}] <on rx exit> finalizing transport with peer: {}" ,
206
+ c_transport. manager. config. zid,
207
+ c_transport. config. zid
208
+ ) ;
209
+ let _ = c_transport. finalize ( 0 ) . await ;
210
+ }
211
+ } ,
212
+ & ZRuntime :: RX ,
213
+ ) ;
210
214
}
211
215
}
212
216
0 commit comments