1
1
use std:: {
2
- net:: SocketAddr ,
2
+ net:: { IpAddr , Ipv4Addr , SocketAddr } ,
3
3
sync:: Arc ,
4
4
time:: { Duration , Instant } ,
5
5
} ;
6
6
7
7
use anyhow:: { Context , Result } ;
8
8
use bytes:: Bytes ;
9
- use quinn:: { Connection , Endpoint , RecvStream , SendStream , TokioRuntime , TransportConfig } ;
10
- use socket2:: { Domain , Protocol , Socket , Type } ;
9
+ use quinn:: {
10
+ crypto:: rustls:: QuicClientConfig , Connection , Endpoint , RecvStream , SendStream , TransportConfig ,
11
+ } ;
12
+ use rustls:: {
13
+ pki_types:: { CertificateDer , PrivateKeyDer } ,
14
+ RootCertStore ,
15
+ } ;
11
16
use tracing:: { trace, warn} ;
12
17
13
18
use crate :: {
14
19
client_handler, stats:: TransferResult , ClientStats , ConnectionSelector , EndpointSelector , Opt ,
15
20
} ;
16
21
17
- /// Derived from the iroh udp SOCKET_BUFFER_SIZE
18
- const SOCKET_BUFFER_SIZE : usize = 7 << 20 ;
19
22
pub const ALPN : & [ u8 ] = b"n0/quinn-bench/0" ;
20
23
21
24
/// Creates a server endpoint which runs on the given runtime
22
- pub fn server_endpoint ( rt : & tokio:: runtime:: Runtime , opt : & Opt ) -> ( SocketAddr , quinn:: Endpoint ) {
23
- let secret_key = iroh:: key:: SecretKey :: generate ( ) ;
24
- let crypto = iroh:: tls:: make_server_config ( & secret_key, vec ! [ ALPN . to_vec( ) ] , false ) . unwrap ( ) ;
25
-
26
- let transport = transport_config ( opt. max_streams , opt. initial_mtu ) ;
27
-
28
- let mut server_config = quinn:: ServerConfig :: with_crypto ( Arc :: new ( crypto) ) ;
29
- server_config. transport_config ( Arc :: new ( transport) ) ;
30
-
31
- let addr = SocketAddr :: new ( "127.0.0.1" . parse ( ) . unwrap ( ) , 0 ) ;
32
-
33
- let socket = bind_socket ( addr) . unwrap ( ) ;
34
-
35
- let _guard = rt. enter ( ) ;
36
- rt. block_on ( async move {
37
- let ep = quinn:: Endpoint :: new (
38
- Default :: default ( ) ,
39
- Some ( server_config) ,
40
- socket,
41
- Arc :: new ( TokioRuntime ) ,
25
+ pub fn server_endpoint (
26
+ rt : & tokio:: runtime:: Runtime ,
27
+ cert : CertificateDer < ' static > ,
28
+ key : PrivateKeyDer < ' static > ,
29
+ opt : & Opt ,
30
+ ) -> ( SocketAddr , quinn:: Endpoint ) {
31
+ let cert_chain = vec ! [ cert] ;
32
+ let mut server_config = quinn:: ServerConfig :: with_single_cert ( cert_chain, key) . unwrap ( ) ;
33
+ server_config. transport = Arc :: new ( transport_config ( opt. max_streams , opt. initial_mtu ) ) ;
34
+
35
+ let endpoint = {
36
+ let _guard = rt. enter ( ) ;
37
+ quinn:: Endpoint :: server (
38
+ server_config,
39
+ SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: LOCALHOST ) , 0 ) ,
42
40
)
43
- . unwrap ( ) ;
44
- let addr = ep . local_addr ( ) . unwrap ( ) ;
45
- ( addr , ep )
46
- } )
41
+ . unwrap ( )
42
+ } ;
43
+ let server_addr = endpoint . local_addr ( ) . unwrap ( ) ;
44
+ ( server_addr , endpoint )
47
45
}
48
46
49
47
/// Create and run a client
50
- pub async fn client ( server_addr : SocketAddr , opt : Opt ) -> Result < ClientStats > {
48
+ pub async fn client (
49
+ server_addr : SocketAddr ,
50
+ server_cert : CertificateDer < ' static > ,
51
+ opt : Opt ,
52
+ ) -> Result < ClientStats > {
51
53
let client_start = std:: time:: Instant :: now ( ) ;
52
- let ( endpoint, connection) = connect_client ( server_addr, opt) . await ?;
54
+ let ( endpoint, connection) = connect_client ( server_addr, server_cert , opt) . await ?;
53
55
let client_connect_time = client_start. elapsed ( ) ;
54
56
let mut res = client_handler (
55
57
EndpointSelector :: Quinn ( endpoint) ,
@@ -64,29 +66,34 @@ pub async fn client(server_addr: SocketAddr, opt: Opt) -> Result<ClientStats> {
64
66
/// Create a client endpoint and client connection
65
67
pub async fn connect_client (
66
68
server_addr : SocketAddr ,
69
+ server_cert : CertificateDer < ' _ > ,
67
70
opt : Opt ,
68
71
) -> Result < ( :: quinn:: Endpoint , Connection ) > {
69
- let secret_key = iroh:: key:: SecretKey :: generate ( ) ;
70
- let quic_client_config =
71
- iroh:: tls:: make_client_config ( & secret_key, None , vec ! [ ALPN . to_vec( ) ] , false ) ?;
72
- let mut config = quinn:: ClientConfig :: new ( Arc :: new ( quic_client_config) ) ;
72
+ let endpoint =
73
+ quinn:: Endpoint :: client ( SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: LOCALHOST ) , 0 ) ) . unwrap ( ) ;
73
74
74
- let transport = transport_config ( opt. max_streams , opt. initial_mtu ) ;
75
+ let mut roots = RootCertStore :: empty ( ) ;
76
+ roots. add ( server_cert) ?;
75
77
76
- // let mut config = quinn::ClientConfig::new(Arc::new(crypto));
77
- config. transport_config ( Arc :: new ( transport) ) ;
78
+ let provider = rustls:: crypto:: ring:: default_provider ( ) ;
78
79
79
- let addr = SocketAddr :: new ( "127.0.0.1" . parse ( ) . unwrap ( ) , 0 ) ;
80
+ let crypto = rustls:: ClientConfig :: builder_with_provider ( provider. into ( ) )
81
+ . with_protocol_versions ( & [ & rustls:: version:: TLS13 ] )
82
+ . unwrap ( )
83
+ . with_root_certificates ( roots)
84
+ . with_no_client_auth ( ) ;
80
85
81
- let socket = bind_socket ( addr) . unwrap ( ) ;
86
+ let mut client_config = quinn:: ClientConfig :: new ( Arc :: new ( QuicClientConfig :: try_from ( crypto) ?) ) ;
87
+ client_config. transport_config ( Arc :: new ( transport_config ( opt. max_streams , opt. initial_mtu ) ) ) ;
82
88
83
- let ep =
84
- quinn:: Endpoint :: new ( Default :: default ( ) , None , socket, Arc :: new ( TokioRuntime ) ) . unwrap ( ) ;
85
- let connection = ep
86
- . connect_with ( config, server_addr, "local" ) ?
89
+ let connection = endpoint
90
+ . connect_with ( client_config, server_addr, "localhost" )
91
+ . unwrap ( )
87
92
. await
88
- . context ( "connecting" ) ?;
89
- Ok ( ( ep, connection) )
93
+ . context ( "unable to connect" ) ?;
94
+ trace ! ( "connected" ) ;
95
+
96
+ Ok ( ( endpoint, connection) )
90
97
}
91
98
92
99
pub fn transport_config ( max_streams : usize , initial_mtu : u16 ) -> TransportConfig {
@@ -104,43 +111,6 @@ pub fn transport_config(max_streams: usize, initial_mtu: u16) -> TransportConfig
104
111
config
105
112
}
106
113
107
- fn bind_socket ( addr : SocketAddr ) -> Result < std:: net:: UdpSocket > {
108
- let socket = Socket :: new ( Domain :: for_address ( addr) , Type :: DGRAM , Some ( Protocol :: UDP ) )
109
- . context ( "create socket" ) ?;
110
-
111
- if addr. is_ipv6 ( ) {
112
- socket. set_only_v6 ( false ) . context ( "set_only_v6" ) ?;
113
- }
114
-
115
- socket
116
- . bind ( & socket2:: SockAddr :: from ( addr) )
117
- . context ( "binding endpoint" ) ?;
118
- socket
119
- . set_send_buffer_size ( SOCKET_BUFFER_SIZE )
120
- . context ( "send buffer size" ) ?;
121
- socket
122
- . set_recv_buffer_size ( SOCKET_BUFFER_SIZE )
123
- . context ( "recv buffer size" ) ?;
124
-
125
- let buf_size = socket. send_buffer_size ( ) . context ( "send buffer size" ) ?;
126
- if buf_size < SOCKET_BUFFER_SIZE {
127
- warn ! (
128
- "Unable to set desired send buffer size. Desired: {}, Actual: {}" ,
129
- SOCKET_BUFFER_SIZE , buf_size
130
- ) ;
131
- }
132
-
133
- let buf_size = socket. recv_buffer_size ( ) . context ( "recv buffer size" ) ?;
134
- if buf_size < SOCKET_BUFFER_SIZE {
135
- warn ! (
136
- "Unable to set desired recv buffer size. Desired: {}, Actual: {}" ,
137
- SOCKET_BUFFER_SIZE , buf_size
138
- ) ;
139
- }
140
-
141
- Ok ( socket. into ( ) )
142
- }
143
-
144
114
async fn drain_stream (
145
115
stream : & mut RecvStream ,
146
116
read_unordered : bool ,
0 commit comments