1
1
use crate :: common;
2
2
3
- use std:: net:: { SocketAddr , TcpListener } ;
4
3
use std:: sync:: Arc ;
5
4
6
5
use axum:: body:: Body ;
@@ -9,6 +8,7 @@ use axum::http::Request;
9
8
use axum:: http:: StatusCode ;
10
9
use axum:: { routing:: post, Router } ;
11
10
use soldr:: db:: RequestState ;
11
+ use tokio:: net:: TcpListener ;
12
12
use tokio:: sync:: Mutex ;
13
13
use tokio:: time:: { sleep, Duration } ;
14
14
use tower:: util:: ServiceExt ;
@@ -42,18 +42,14 @@ async fn timeout_handler() -> impl axum::response::IntoResponse {
42
42
#[ tokio:: test]
43
43
async fn ingest_save_and_proxy ( ) {
44
44
// set up origin server
45
- let listener = TcpListener :: bind ( "0.0.0.0:0" . parse :: < SocketAddr > ( ) . unwrap ( ) ) . unwrap ( ) ;
45
+ let listener = TcpListener :: bind ( "0.0.0.0:0" ) . await . unwrap ( ) ;
46
46
let port = listener. local_addr ( ) . unwrap ( ) . port ( ) ;
47
47
let sentinel: Sentinel = Arc :: new ( Mutex :: new ( None ) ) ;
48
48
let s2 = sentinel. clone ( ) ;
49
49
let client_app = Router :: new ( ) . route ( "/" , post ( success_handler) . with_state ( s2) ) ;
50
50
51
51
tokio:: spawn ( async move {
52
- axum:: Server :: from_tcp ( listener)
53
- . unwrap ( )
54
- . serve ( client_app. into_make_service ( ) )
55
- . await
56
- . unwrap ( ) ;
52
+ axum:: serve ( listener, client_app) . await . unwrap ( ) ;
57
53
} ) ;
58
54
59
55
let ( ingest, mgmt, _) = app ( & common:: config ( ) ) . await . unwrap ( ) ;
@@ -74,7 +70,7 @@ async fn ingest_save_and_proxy() {
74
70
. method ( "POST" )
75
71
. uri ( "/origins" )
76
72
. header ( "Content-Type" , "application/json" )
77
- . body ( body. into ( ) )
73
+ . body ( body)
78
74
. unwrap ( ) ,
79
75
)
80
76
. await
@@ -117,7 +113,9 @@ async fn ingest_save_and_proxy() {
117
113
118
114
assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
119
115
120
- let body = hyper:: body:: to_bytes ( response. into_body ( ) ) . await . unwrap ( ) ;
116
+ let body = axum:: body:: to_bytes ( response. into_body ( ) , 1_000_000 )
117
+ . await
118
+ . unwrap ( ) ;
121
119
122
120
let reqs: Vec < db:: Request > = serde_json:: from_slice ( & body) . unwrap ( ) ;
123
121
assert_eq ! ( reqs[ 0 ] . state, RequestState :: Completed ) ;
@@ -137,7 +135,9 @@ async fn ingest_save_and_proxy() {
137
135
138
136
assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
139
137
140
- let body = hyper:: body:: to_bytes ( response. into_body ( ) ) . await . unwrap ( ) ;
138
+ let body = axum:: body:: to_bytes ( response. into_body ( ) , 1_000_000 )
139
+ . await
140
+ . unwrap ( ) ;
141
141
142
142
let attempts: Vec < db:: Attempt > = serde_json:: from_slice ( & body) . unwrap ( ) ;
143
143
assert_eq ! ( attempts[ 0 ] . id, 1 ) ;
@@ -154,16 +154,12 @@ async fn ingest_proxy_failure() {
154
154
common:: enable_tracing ( ) ;
155
155
156
156
// set up origin server
157
- let listener = TcpListener :: bind ( "0.0.0.0:0" . parse :: < SocketAddr > ( ) . unwrap ( ) ) . unwrap ( ) ;
157
+ let listener = TcpListener :: bind ( "0.0.0.0:0" ) . await . unwrap ( ) ;
158
158
let port = listener. local_addr ( ) . unwrap ( ) . port ( ) ;
159
159
let client_app = Router :: new ( ) . route ( "/failure" , post ( failure_handler) ) ;
160
160
161
161
tokio:: spawn ( async move {
162
- axum:: Server :: from_tcp ( listener)
163
- . unwrap ( )
164
- . serve ( client_app. into_make_service ( ) )
165
- . await
166
- . unwrap ( ) ;
162
+ axum:: serve ( listener, client_app) . await . unwrap ( ) ;
167
163
} ) ;
168
164
169
165
let ( ingest, mgmt, _) = app ( & common:: config ( ) ) . await . unwrap ( ) ;
@@ -190,7 +186,7 @@ async fn ingest_proxy_failure() {
190
186
. method ( "POST" )
191
187
. uri ( "/origins" )
192
188
. header ( "Content-Type" , "application/json" )
193
- . body ( body. into ( ) )
189
+ . body ( body)
194
190
. unwrap ( ) ,
195
191
)
196
192
. await
@@ -231,7 +227,9 @@ async fn ingest_proxy_failure() {
231
227
232
228
assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
233
229
234
- let body = hyper:: body:: to_bytes ( response. into_body ( ) ) . await . unwrap ( ) ;
230
+ let body = axum:: body:: to_bytes ( response. into_body ( ) , 1_000_000 )
231
+ . await
232
+ . unwrap ( ) ;
235
233
236
234
let reqs: Vec < db:: Request > = serde_json:: from_slice ( & body) . unwrap ( ) ;
237
235
assert_eq ! ( reqs[ 0 ] . state, RequestState :: Failed ) ;
@@ -251,7 +249,9 @@ async fn ingest_proxy_failure() {
251
249
252
250
assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
253
251
254
- let body = hyper:: body:: to_bytes ( response. into_body ( ) ) . await . unwrap ( ) ;
252
+ let body = axum:: body:: to_bytes ( response. into_body ( ) , 1_000_000 )
253
+ . await
254
+ . unwrap ( ) ;
255
255
256
256
let attempts: Vec < db:: Attempt > = serde_json:: from_slice ( & body) . unwrap ( ) ;
257
257
assert_eq ! ( attempts[ 0 ] . id, 1 ) ;
@@ -265,16 +265,12 @@ async fn ingest_proxy_timeout() {
265
265
common:: enable_tracing ( ) ;
266
266
267
267
// set up origin server
268
- let listener = TcpListener :: bind ( "0.0.0.0:0" . parse :: < SocketAddr > ( ) . unwrap ( ) ) . unwrap ( ) ;
268
+ let listener = TcpListener :: bind ( "0.0.0.0:0" ) . await . unwrap ( ) ;
269
269
let port = listener. local_addr ( ) . unwrap ( ) . port ( ) ;
270
270
let client_app = Router :: new ( ) . route ( "/timeout" , post ( timeout_handler) ) ;
271
271
272
272
tokio:: spawn ( async move {
273
- axum:: Server :: from_tcp ( listener)
274
- . unwrap ( )
275
- . serve ( client_app. into_make_service ( ) )
276
- . await
277
- . unwrap ( ) ;
273
+ axum:: serve ( listener, client_app) . await . unwrap ( ) ;
278
274
} ) ;
279
275
280
276
let ( ingest, mgmt, _) = app ( & common:: config ( ) ) . await . unwrap ( ) ;
@@ -295,7 +291,7 @@ async fn ingest_proxy_timeout() {
295
291
. method ( "POST" )
296
292
. uri ( "/origins" )
297
293
. header ( "Content-Type" , "application/json" )
298
- . body ( body. into ( ) )
294
+ . body ( body)
299
295
. unwrap ( ) ,
300
296
)
301
297
. await
@@ -336,7 +332,9 @@ async fn ingest_proxy_timeout() {
336
332
337
333
assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
338
334
339
- let body = hyper:: body:: to_bytes ( response. into_body ( ) ) . await . unwrap ( ) ;
335
+ let body = axum:: body:: to_bytes ( response. into_body ( ) , 1_000_000 )
336
+ . await
337
+ . unwrap ( ) ;
340
338
341
339
let reqs: Vec < db:: Request > = serde_json:: from_slice ( & body) . unwrap ( ) ;
342
340
assert_eq ! ( reqs[ 0 ] . state, RequestState :: Timeout ) ;
@@ -356,7 +354,9 @@ async fn ingest_proxy_timeout() {
356
354
357
355
assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
358
356
359
- let body = hyper:: body:: to_bytes ( response. into_body ( ) ) . await . unwrap ( ) ;
357
+ let body = axum:: body:: to_bytes ( response. into_body ( ) , 1_000_000 )
358
+ . await
359
+ . unwrap ( ) ;
360
360
361
361
let attempts: Vec < db:: Attempt > = serde_json:: from_slice ( & body) . unwrap ( ) ;
362
362
assert_eq ! ( attempts[ 0 ] . id, 1 ) ;
0 commit comments