Skip to content

Revert "vsock: Fix TCP connection bug" and implement a different approach #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/devices/src/virtio/vsock/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,9 @@ impl VsockMuxer {
let mut proxy_map = self.proxy_map.write().unwrap();

if let Some(proxy) = proxy_map.get(&id) {
proxy.lock().unwrap().confirm_connect(pkt)
if let Some(update) = proxy.lock().unwrap().confirm_connect(pkt) {
self.process_proxy_update(id, update);
}
} else if let Some(ref mut ipc_map) = &mut self.unix_ipc_port_map {
if let Some((path, listen)) = ipc_map.get(&pkt.dst_port()) {
let mem = self.mem.as_ref().unwrap();
Expand Down
5 changes: 3 additions & 2 deletions src/devices/src/virtio/vsock/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub enum ProxyError {
pub enum ProxyStatus {
Idle,
Connecting,
ConnectedUnconfirmed,
Connected,
Listening,
Closed,
Expand Down Expand Up @@ -70,7 +69,9 @@ pub trait Proxy: Send + AsRawFd {
#[allow(dead_code)]
fn status(&self) -> ProxyStatus;
fn connect(&mut self, pkt: &VsockPacket, req: TsiConnectReq) -> ProxyUpdate;
fn confirm_connect(&mut self, _pkt: &VsockPacket) {}
fn confirm_connect(&mut self, _pkt: &VsockPacket) -> Option<ProxyUpdate> {
None
}
fn getpeername(&mut self, pkt: &VsockPacket);
fn sendmsg(&mut self, pkt: &VsockPacket) -> ProxyUpdate;
fn sendto_addr(&mut self, req: TsiSendtoAddr) -> ProxyUpdate;
Expand Down
40 changes: 20 additions & 20 deletions src/devices/src/virtio/vsock/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ impl TcpProxy {
push_packet(self.cid, rx, &self.rxq, &self.queue, &self.mem);
}

fn switch_to_blocking(&mut self) {
fn switch_to_connected(&mut self) {
self.status = ProxyStatus::Connected;
match fcntl(self.fd, FcntlArg::F_GETFL) {
Ok(flags) => match OFlag::from_bits(flags) {
Some(flags) => {
Expand Down Expand Up @@ -373,8 +374,7 @@ impl Proxy for TcpProxy {
) {
Ok(()) => {
debug!("vsock: connect: Connected");
self.switch_to_blocking();
self.status = ProxyStatus::ConnectedUnconfirmed;
self.switch_to_connected();
0
}
Err(nix::errno::Errno::EINPROGRESS) => {
Expand All @@ -393,9 +393,9 @@ impl Proxy for TcpProxy {
};

if self.status == ProxyStatus::Connecting {
update.polling = Some((self.id, self.fd, EventSet::IN | EventSet::OUT));
update.polling = Some((self.id, self.fd, EventSet::OUT));
} else {
if self.status == ProxyStatus::ConnectedUnconfirmed {
if self.status == ProxyStatus::Connected {
update.polling = Some((self.id, self.fd, EventSet::IN));
}
self.push_connect_rsp(result);
Expand All @@ -404,7 +404,7 @@ impl Proxy for TcpProxy {
update
}

fn confirm_connect(&mut self, pkt: &VsockPacket) {
fn confirm_connect(&mut self, pkt: &VsockPacket) -> Option<ProxyUpdate> {
debug!(
"tcp: confirm_connect: local_port={} peer_port={}, src_port={}, dst_port={}",
pkt.dst_port(),
Expand All @@ -413,15 +413,6 @@ impl Proxy for TcpProxy {
self.peer_port,
);

if self.status != ProxyStatus::ConnectedUnconfirmed {
warn!(
"tcp: confirm_connect: Expected state to be {:?}, but it is {:?}",
ProxyStatus::ConnectedUnconfirmed,
self.status
);
}
self.status = ProxyStatus::Connected;

self.peer_buf_alloc = pkt.buf_alloc();
self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());

Expand All @@ -434,6 +425,13 @@ impl Proxy for TcpProxy {
peer_port: pkt.src_port(),
};
push_packet(self.cid, rx, &self.rxq, &self.queue, &self.mem);

// Now that the vsock transport is fully established, start listening
// for events in the TCP socket again.
Some(ProxyUpdate {
polling: Some((self.id, self.fd, EventSet::IN)),
..Default::default()
})
}

fn getpeername(&mut self, pkt: &VsockPacket) {
Expand Down Expand Up @@ -583,6 +581,8 @@ impl Proxy for TcpProxy {
self.peer_buf_alloc = pkt.buf_alloc();
self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());

self.status = ProxyStatus::Connected;

ProxyUpdate {
polling: Some((self.id, self.fd, EventSet::IN)),
..Default::default()
Expand Down Expand Up @@ -614,8 +614,7 @@ impl Proxy for TcpProxy {
self.peer_buf_alloc = pkt.buf_alloc();
self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());

self.switch_to_blocking();
self.status = ProxyStatus::Connected;
self.switch_to_connected();

ProxyUpdate {
polling: Some((self.id, self.fd, EventSet::IN)),
Expand Down Expand Up @@ -756,11 +755,12 @@ impl Proxy for TcpProxy {
if evset.contains(EventSet::OUT) {
debug!("process_event: OUT");
if self.status == ProxyStatus::Connecting {
self.switch_to_blocking();
self.status = ProxyStatus::ConnectedUnconfirmed;
self.switch_to_connected();
self.push_connect_rsp(0);
update.signal_queue = true;
update.polling = Some((self.id(), self.fd, EventSet::IN));
// Stop listening for events in the TCP socket until we receive
// OP_REQUEST and the vsock transport is fully established.
update.polling = Some((self.id(), self.fd, EventSet::empty()));
} else {
error!("vsock::tcp: EventSet::OUT while not connecting");
}
Expand Down
4 changes: 3 additions & 1 deletion src/devices/src/virtio/vsock/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl Proxy for UnixProxy {
update
}

fn confirm_connect(&mut self, pkt: &VsockPacket) {
fn confirm_connect(&mut self, pkt: &VsockPacket) -> Option<ProxyUpdate> {
debug!(
"tcp: confirm_connect: local_port={} peer_port={}, src_port={}, dst_port={}",
pkt.dst_port(),
Expand All @@ -377,6 +377,8 @@ impl Proxy for UnixProxy {
peer_port: pkt.src_port(),
};
push_packet(self.cid, rx, &self.rxq, &self.queue, &self.mem);

None
}

fn getpeername(&mut self, _pkt: &VsockPacket) {
Expand Down
21 changes: 19 additions & 2 deletions tests/test_cases/src/tcp_tester.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::{ErrorKind, Read, Write};
use std::mem;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener, TcpStream};
use std::thread;
use std::time::Duration;

fn expect_msg(stream: &mut TcpStream, expected: &[u8]) {
Expand All @@ -25,6 +26,23 @@ fn set_timeouts(stream: &mut TcpStream) {
.unwrap();
}

fn connect(port: u16) -> TcpStream {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
let mut tries = 0;
loop {
match TcpStream::connect(&addr) {
Ok(stream) => return stream,
Err(err) => {
if tries == 5 {
panic!("Couldn't connect to server after 5 attempts: {err}");
}
tries += 1;
thread::sleep(Duration::from_secs(1));
}
}
}
}

#[derive(Debug, Copy, Clone)]
pub struct TcpTester {
port: u16,
Expand All @@ -51,8 +69,7 @@ impl TcpTester {
}

pub fn run_client(&self) {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.port);
let mut stream = TcpStream::connect(&addr).unwrap();
let mut stream = connect(self.port);
set_timeouts(&mut stream);
expect_msg(&mut stream, b"ping!");
expect_wouldblock(&mut stream);
Expand Down
Loading