Skip to content

Commit 09e04c6

Browse files
committed
Progress?
1 parent aa523b9 commit 09e04c6

File tree

3 files changed

+38
-10
lines changed

3 files changed

+38
-10
lines changed

neqo-bin/src/client/mod.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ use neqo_udp::RecvBuf;
3434
use tokio::time::Sleep;
3535
use url::{Host, Origin, Url};
3636

37-
use crate::{udp::process_send, SharedArgs};
37+
use crate::{
38+
udp::{process_send, SocketLookup},
39+
SharedArgs,
40+
};
3841

3942
mod http09;
4043
mod http3;
@@ -406,6 +409,11 @@ struct Runner<'a, H: Handler> {
406409
args: &'a Args,
407410
recv_buf: RecvBuf,
408411
}
412+
impl<'a> SocketLookup for &'a mut crate::udp::Socket {
413+
fn get(&mut self, _addr: SocketAddr) -> &mut crate::udp::Socket {
414+
self
415+
}
416+
}
409417

410418
impl<'a, H: Handler> Runner<'a, H> {
411419
fn new(
@@ -462,12 +470,15 @@ impl<'a, H: Handler> Runner<'a, H> {
462470
}
463471

464472
async fn process_output(&mut self) -> Result<(), io::Error> {
465-
process_send(
466-
|_| self.socket,
473+
let mut timeout = None;
474+
let res = process_send(
475+
self.socket,
467476
|| self.client.process_output(Instant::now()),
468-
&mut self.timeout,
477+
&mut timeout,
469478
)
470-
.await
479+
.await;
480+
self.timeout = timeout;
481+
res
471482
}
472483

473484
async fn process_multiple_input(&mut self) -> Res<()> {

neqo-bin/src/server/mod.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ use neqo_transport::{Output, RandomConnectionIdGenerator, Version};
3939
use neqo_udp::RecvBuf;
4040
use tokio::time::Sleep;
4141

42-
use crate::{udp::process_send, SharedArgs};
42+
use crate::{
43+
udp::{process_send, SocketLookup},
44+
SharedArgs,
45+
};
4346

4447
const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10);
4548

@@ -211,6 +214,16 @@ pub struct ServerRunner {
211214
recv_buf: RecvBuf,
212215
}
213216

217+
impl SocketLookup for &mut [(SocketAddr, crate::udp::Socket)] {
218+
fn get(&mut self, addr: SocketAddr) -> &mut crate::udp::Socket {
219+
let ((_host, first_socket), rest) = self.split_first_mut().unwrap();
220+
rest.iter_mut()
221+
.map(|(_host, socket)| socket)
222+
.find(|socket| socket.local_addr().is_ok_and(|a| a == addr))
223+
.unwrap_or(first_socket)
224+
}
225+
}
226+
214227
impl ServerRunner {
215228
#[must_use]
216229
pub fn new(
@@ -250,7 +263,7 @@ impl ServerRunner {
250263
mut input_dgram: Option<Datagram<&mut [u8]>>,
251264
) -> Result<(), io::Error> {
252265
process_send(
253-
|addr| Self::find_socket(sockets, addr),
266+
sockets,
254267
|| server.process(input_dgram.take(), now()),
255268
timeout,
256269
)

neqo-bin/src/udp.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ impl Socket {
8585
}
8686
}
8787

88+
pub trait SocketLookup {
89+
fn get(&mut self, addr: SocketAddr) -> &mut Socket;
90+
}
91+
8892
/// Loops around `process` while it returns `Output::Datagram`. While looping,
8993
/// batch together returned datagrams with the same length, destination and TOS
9094
/// byte into GSO-sized chunks and send them when the batch is full. If we
@@ -95,8 +99,8 @@ impl Socket {
9599
/// introduces a lot of unnecessary memory copies. It would be better to extend
96100
/// this with something like https://github.com/mozilla/neqo/pull/2138, where
97101
/// the stack directly writes into the GSO buffer.
98-
pub async fn process_send<'a>(
99-
mut socket: impl FnMut(SocketAddr) -> &'a mut Socket,
102+
pub async fn process_send(
103+
mut socket_lookup: impl SocketLookup,
100104
mut process: impl FnMut() -> Output,
101105
timeout: &mut Option<Pin<Box<Sleep>>>,
102106
) -> Result<(), io::Error> {
@@ -111,7 +115,7 @@ pub async fn process_send<'a>(
111115
if (send || exit) && !batch_data.is_empty() {
112116
let meta = batch_meta.clone().unwrap();
113117
// Send all collected datagrams as GSO-sized chunks.
114-
let socket = socket(meta.source());
118+
let socket = socket_lookup.get(meta.source());
115119
for gso_chunk in batch_data.chunks(socket.max_gso_segments() * meta.len()) {
116120
// Optimistically attempt sending datagram. In case the OS
117121
// buffer is full, wait till socket is writable then try again.

0 commit comments

Comments
 (0)