17
17
use async_trait:: async_trait;
18
18
use futures:: FutureExt ;
19
19
use log:: { debug, error} ;
20
+
20
21
use pingora_error:: { ErrorType :: * , OrErr , Result } ;
22
+ use std:: io:: IoSliceMut ;
21
23
use std:: os:: unix:: io:: AsRawFd ;
22
24
use std:: pin:: Pin ;
23
25
use std:: sync:: Arc ;
24
26
use std:: task:: { Context , Poll } ;
25
27
use std:: time:: { Duration , Instant , SystemTime } ;
26
- use tokio:: io:: { self , AsyncRead , AsyncWrite , AsyncWriteExt , BufStream , ReadBuf } ;
28
+ use tokio:: io:: { self , AsyncRead , AsyncWrite , AsyncWriteExt , BufStream , Interest , ReadBuf } ;
27
29
use tokio:: net:: { TcpStream , UnixStream } ;
28
30
29
31
use crate :: protocols:: l4:: ext:: { set_tcp_keepalive, TcpKeepalive } ;
@@ -118,6 +120,162 @@ impl AsRawFd for RawStream {
118
120
}
119
121
}
120
122
123
+ #[ derive( Debug ) ]
124
+ struct RawStreamWrapper {
125
+ pub ( crate ) stream : RawStream ,
126
+ /// store the last rx timestamp of the stream.
127
+ pub ( crate ) rx_ts : Option < SystemTime > ,
128
+ #[ cfg( target_os = "linux" ) ]
129
+ /// This can be reused across multiple recvmsg calls. The cmsg buffer may
130
+ /// come from old sockets created by older version of pingora and so,
131
+ /// this vector can only grow.
132
+ reusable_cmsg_space : Vec < u8 > ,
133
+ }
134
+
135
+ impl RawStreamWrapper {
136
+ pub fn new ( stream : RawStream ) -> Self {
137
+ RawStreamWrapper {
138
+ stream,
139
+ rx_ts : None ,
140
+ #[ cfg( target_os = "linux" ) ]
141
+ reusable_cmsg_space : nix:: cmsg_space!( nix:: sys:: time:: TimeSpec ) ,
142
+ }
143
+ }
144
+ }
145
+
146
+ impl AsyncRead for RawStreamWrapper {
147
+ #[ cfg( not( target_os = "linux" ) ) ]
148
+ fn poll_read (
149
+ self : Pin < & mut Self > ,
150
+ cx : & mut Context < ' _ > ,
151
+ buf : & mut ReadBuf < ' _ > ,
152
+ ) -> Poll < io:: Result < ( ) > > {
153
+ // Safety: Basic enum pin projection
154
+ unsafe {
155
+ let rs_wrapper = Pin :: get_unchecked_mut ( self ) ;
156
+ match & mut rs_wrapper. stream {
157
+ RawStream :: Tcp ( s) => Pin :: new_unchecked ( s) . poll_read ( cx, buf) ,
158
+ RawStream :: Unix ( s) => Pin :: new_unchecked ( s) . poll_read ( cx, buf) ,
159
+ }
160
+ }
161
+ }
162
+
163
+ #[ cfg( target_os = "linux" ) ]
164
+ fn poll_read (
165
+ self : Pin < & mut Self > ,
166
+ cx : & mut Context < ' _ > ,
167
+ buf : & mut ReadBuf < ' _ > ,
168
+ ) -> Poll < io:: Result < ( ) > > {
169
+ use futures:: ready;
170
+ use nix:: sys:: socket:: { recvmsg, ControlMessageOwned , MsgFlags , SockaddrStorage } ;
171
+
172
+ // Safety: Basic pin projection to get mutable stream
173
+ let rs_wrapper = unsafe { Pin :: get_unchecked_mut ( self ) } ;
174
+ match & mut rs_wrapper. stream {
175
+ RawStream :: Tcp ( s) => {
176
+ loop {
177
+ ready ! ( s. poll_read_ready( cx) ) ?;
178
+ // Safety: maybe uninitialized bytes will only be passed to recvmsg
179
+ let b = unsafe {
180
+ & mut * ( buf. unfilled_mut ( ) as * mut [ std:: mem:: MaybeUninit < u8 > ]
181
+ as * mut [ u8 ] )
182
+ } ;
183
+ let mut iov = [ IoSliceMut :: new ( b) ] ;
184
+ rs_wrapper. reusable_cmsg_space . clear ( ) ;
185
+
186
+ match s. try_io ( Interest :: READABLE , || {
187
+ recvmsg :: < SockaddrStorage > (
188
+ s. as_raw_fd ( ) ,
189
+ & mut iov,
190
+ Some ( & mut rs_wrapper. reusable_cmsg_space ) ,
191
+ MsgFlags :: empty ( ) ,
192
+ )
193
+ . map_err ( |errno| errno. into ( ) )
194
+ } ) {
195
+ Ok ( r) => {
196
+ if let Some ( ControlMessageOwned :: ScmTimestampsns ( rtime) ) = r
197
+ . cmsgs ( )
198
+ . find ( |i| matches ! ( i, ControlMessageOwned :: ScmTimestampsns ( _) ) )
199
+ {
200
+ // The returned timestamp is a real (i.e. not monotonic) timestamp
201
+ // https://docs.kernel.org/networking/timestamping.html
202
+ rs_wrapper. rx_ts =
203
+ SystemTime :: UNIX_EPOCH . checked_add ( rtime. system . into ( ) ) ;
204
+ }
205
+ // Safety: We trust `recvmsg` to have filled up `r.bytes` bytes in the buffer.
206
+ unsafe {
207
+ buf. assume_init ( r. bytes ) ;
208
+ }
209
+ buf. advance ( r. bytes ) ;
210
+ return Poll :: Ready ( Ok ( ( ) ) ) ;
211
+ }
212
+ Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock => continue ,
213
+ Err ( e) => return Poll :: Ready ( Err ( e) ) ,
214
+ }
215
+ }
216
+ }
217
+ // Unix RX timestamp only works with datagram for now, so we do not care about it
218
+ RawStream :: Unix ( s) => unsafe { Pin :: new_unchecked ( s) . poll_read ( cx, buf) } ,
219
+ }
220
+ }
221
+ }
222
+
223
+ impl AsyncWrite for RawStreamWrapper {
224
+ fn poll_write ( self : Pin < & mut Self > , cx : & mut Context , buf : & [ u8 ] ) -> Poll < io:: Result < usize > > {
225
+ // Safety: Basic enum pin projection
226
+ unsafe {
227
+ match & mut Pin :: get_unchecked_mut ( self ) . stream {
228
+ RawStream :: Tcp ( s) => Pin :: new_unchecked ( s) . poll_write ( cx, buf) ,
229
+ RawStream :: Unix ( s) => Pin :: new_unchecked ( s) . poll_write ( cx, buf) ,
230
+ }
231
+ }
232
+ }
233
+
234
+ fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
235
+ // Safety: Basic enum pin projection
236
+ unsafe {
237
+ match & mut Pin :: get_unchecked_mut ( self ) . stream {
238
+ RawStream :: Tcp ( s) => Pin :: new_unchecked ( s) . poll_flush ( cx) ,
239
+ RawStream :: Unix ( s) => Pin :: new_unchecked ( s) . poll_flush ( cx) ,
240
+ }
241
+ }
242
+ }
243
+
244
+ fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
245
+ // Safety: Basic enum pin projection
246
+ unsafe {
247
+ match & mut Pin :: get_unchecked_mut ( self ) . stream {
248
+ RawStream :: Tcp ( s) => Pin :: new_unchecked ( s) . poll_shutdown ( cx) ,
249
+ RawStream :: Unix ( s) => Pin :: new_unchecked ( s) . poll_shutdown ( cx) ,
250
+ }
251
+ }
252
+ }
253
+
254
+ fn poll_write_vectored (
255
+ self : Pin < & mut Self > ,
256
+ cx : & mut Context < ' _ > ,
257
+ bufs : & [ std:: io:: IoSlice < ' _ > ] ,
258
+ ) -> Poll < io:: Result < usize > > {
259
+ // Safety: Basic enum pin projection
260
+ unsafe {
261
+ match & mut Pin :: get_unchecked_mut ( self ) . stream {
262
+ RawStream :: Tcp ( s) => Pin :: new_unchecked ( s) . poll_write_vectored ( cx, bufs) ,
263
+ RawStream :: Unix ( s) => Pin :: new_unchecked ( s) . poll_write_vectored ( cx, bufs) ,
264
+ }
265
+ }
266
+ }
267
+
268
+ fn is_write_vectored ( & self ) -> bool {
269
+ self . stream . is_write_vectored ( )
270
+ }
271
+ }
272
+
273
+ impl AsRawFd for RawStreamWrapper {
274
+ fn as_raw_fd ( & self ) -> std:: os:: unix:: io:: RawFd {
275
+ self . stream . as_raw_fd ( )
276
+ }
277
+ }
278
+
121
279
// Large read buffering helps reducing syscalls with little trade-off
122
280
// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.
123
281
const BUF_READ_SIZE : usize = 64 * 1024 ;
@@ -133,7 +291,7 @@ const BUF_WRITE_SIZE: usize = 1460;
133
291
/// A concrete type for transport layer connection + extra fields for logging
134
292
#[ derive( Debug ) ]
135
293
pub struct Stream {
136
- stream : BufStream < RawStream > ,
294
+ stream : BufStream < RawStreamWrapper > ,
137
295
buffer_write : bool ,
138
296
proxy_digest : Option < Arc < ProxyDigest > > ,
139
297
socket_digest : Option < Arc < SocketDigest > > ,
@@ -143,12 +301,14 @@ pub struct Stream {
143
301
pub tracer : Option < Tracer > ,
144
302
read_pending_time : AccumulatedDuration ,
145
303
write_pending_time : AccumulatedDuration ,
304
+ /// Last rx timestamp associated with the last recvmsg call.
305
+ pub rx_ts : Option < SystemTime > ,
146
306
}
147
307
148
308
impl Stream {
149
309
/// set TCP nodelay for this connection if `self` is TCP
150
310
pub fn set_nodelay ( & mut self ) -> Result < ( ) > {
151
- if let RawStream :: Tcp ( s) = & self . stream . get_ref ( ) {
311
+ if let RawStream :: Tcp ( s) = & self . stream . get_mut ( ) . stream {
152
312
s. set_nodelay ( true )
153
313
. or_err ( ConnectError , "failed to set_nodelay" ) ?;
154
314
}
@@ -157,40 +317,68 @@ impl Stream {
157
317
158
318
/// set TCP keepalive settings for this connection if `self` is TCP
159
319
pub fn set_keepalive ( & mut self , ka : & TcpKeepalive ) -> Result < ( ) > {
160
- if let RawStream :: Tcp ( s) = & self . stream . get_ref ( ) {
320
+ if let RawStream :: Tcp ( s) = & self . stream . get_mut ( ) . stream {
161
321
debug ! ( "Setting tcp keepalive" ) ;
162
322
set_tcp_keepalive ( s, ka) ?;
163
323
}
164
324
Ok ( ( ) )
165
325
}
326
+
327
+ #[ cfg( target_os = "linux" ) ]
328
+ pub fn set_rx_timestamp ( & mut self ) -> Result < ( ) > {
329
+ use nix:: sys:: socket:: { setsockopt, sockopt, TimestampingFlag } ;
330
+
331
+ if let RawStream :: Tcp ( s) = & self . stream . get_mut ( ) . stream {
332
+ let timestamp_options = TimestampingFlag :: SOF_TIMESTAMPING_RX_SOFTWARE
333
+ | TimestampingFlag :: SOF_TIMESTAMPING_SOFTWARE ;
334
+ return setsockopt ( s. as_raw_fd ( ) , sockopt:: Timestamping , & timestamp_options)
335
+ . or_err ( InternalError , "failed to set SOF_TIMESTAMPING_RX_SOFTWARE" ) ;
336
+ }
337
+ Ok ( ( ) )
338
+ }
339
+
340
+ #[ cfg( not( target_os = "linux" ) ) ]
341
+ pub fn set_rx_timestamp ( & mut self ) -> io:: Result < ( ) > {
342
+ Ok ( ( ) )
343
+ }
166
344
}
167
345
168
346
impl From < TcpStream > for Stream {
169
347
fn from ( s : TcpStream ) -> Self {
170
348
Stream {
171
- stream : BufStream :: with_capacity ( BUF_READ_SIZE , BUF_WRITE_SIZE , RawStream :: Tcp ( s) ) ,
349
+ stream : BufStream :: with_capacity (
350
+ BUF_READ_SIZE ,
351
+ BUF_WRITE_SIZE ,
352
+ RawStreamWrapper :: new ( RawStream :: Tcp ( s) ) ,
353
+ ) ,
172
354
buffer_write : true ,
173
355
established_ts : SystemTime :: now ( ) ,
174
356
proxy_digest : None ,
175
357
socket_digest : None ,
176
358
tracer : None ,
177
359
read_pending_time : AccumulatedDuration :: new ( ) ,
178
360
write_pending_time : AccumulatedDuration :: new ( ) ,
361
+ rx_ts : None ,
179
362
}
180
363
}
181
364
}
182
365
183
366
impl From < UnixStream > for Stream {
184
367
fn from ( s : UnixStream ) -> Self {
185
368
Stream {
186
- stream : BufStream :: with_capacity ( BUF_READ_SIZE , BUF_WRITE_SIZE , RawStream :: Unix ( s) ) ,
369
+ stream : BufStream :: with_capacity (
370
+ BUF_READ_SIZE ,
371
+ BUF_WRITE_SIZE ,
372
+ RawStreamWrapper :: new ( RawStream :: Unix ( s) ) ,
373
+ ) ,
187
374
buffer_write : true ,
188
375
established_ts : SystemTime :: now ( ) ,
189
376
proxy_digest : None ,
190
377
socket_digest : None ,
191
378
tracer : None ,
192
379
read_pending_time : AccumulatedDuration :: new ( ) ,
193
380
write_pending_time : AccumulatedDuration :: new ( ) ,
381
+ rx_ts : None ,
194
382
}
195
383
}
196
384
}
@@ -262,7 +450,7 @@ impl Drop for Stream {
262
450
t. 0 . on_disconnected ( ) ;
263
451
}
264
452
/* use nodelay/local_addr function to detect socket status */
265
- let ret = match & self . stream . get_ref ( ) {
453
+ let ret = match & self . stream . get_ref ( ) . stream {
266
454
RawStream :: Tcp ( s) => s. nodelay ( ) . err ( ) ,
267
455
RawStream :: Unix ( s) => s. local_addr ( ) . err ( ) ,
268
456
} ;
@@ -298,6 +486,7 @@ impl AsyncRead for Stream {
298
486
) -> Poll < io:: Result < ( ) > > {
299
487
let result = Pin :: new ( & mut self . stream ) . poll_read ( cx, buf) ;
300
488
self . read_pending_time . poll_time ( & result) ;
489
+ self . rx_ts = self . stream . get_ref ( ) . rx_ts ;
301
490
result
302
491
}
303
492
}
@@ -528,3 +717,42 @@ impl AccumulatedDuration {
528
717
}
529
718
}
530
719
}
720
+
721
+ #[ cfg( test) ]
722
+ mod tests {
723
+ use super :: * ;
724
+ use std:: sync:: Arc ;
725
+ use tokio:: io:: AsyncReadExt ;
726
+ use tokio:: io:: AsyncWriteExt ;
727
+ use tokio:: net:: TcpListener ;
728
+ use tokio:: sync:: Notify ;
729
+
730
+ #[ cfg( target_os = "linux" ) ]
731
+ #[ tokio:: test]
732
+ async fn test_rx_timestamp ( ) {
733
+ let message = "hello world" . as_bytes ( ) ;
734
+ let listener = TcpListener :: bind ( "127.0.0.1:0" ) . await . unwrap ( ) ;
735
+ let addr = listener. local_addr ( ) . unwrap ( ) ;
736
+ let notify = Arc :: new ( Notify :: new ( ) ) ;
737
+ let notify2 = notify. clone ( ) ;
738
+
739
+ tokio:: spawn ( async move {
740
+ let ( mut stream, _) = listener. accept ( ) . await . unwrap ( ) ;
741
+ notify2. notified ( ) . await ;
742
+ stream. write_all ( message) . await . unwrap ( ) ;
743
+ } ) ;
744
+
745
+ let mut stream: Stream = TcpStream :: connect ( addr) . await . unwrap ( ) . into ( ) ;
746
+ stream. set_rx_timestamp ( ) . unwrap ( ) ;
747
+ // Receive the message
748
+ // setsockopt for SO_TIMESTAMPING is asynchronous so sleep a little bit
749
+ // to let kernel do the work
750
+ std:: thread:: sleep ( Duration :: from_micros ( 100 ) ) ;
751
+ notify. notify_one ( ) ;
752
+
753
+ let mut buffer = vec ! [ 0u8 ; message. len( ) ] ;
754
+ let n = stream. read ( buffer. as_mut_slice ( ) ) . await . unwrap ( ) ;
755
+ assert_eq ! ( n, message. len( ) ) ;
756
+ assert ! ( stream. rx_ts. is_some( ) ) ;
757
+ }
758
+ }
0 commit comments