Skip to content

Commit c029adb

Browse files
authored
Merge pull request #689 from paolobarbolini/sleep-without-thread
Use `tokio` sleep instead of blocking thread when using `reqwest` client
2 parents 5dd4375 + 6ef36ce commit c029adb

File tree

5 files changed

+99
-29
lines changed

5 files changed

+99
-29
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ futures-util = { version = "0.3.31", default-features = false, features = ["io"]
3434

3535
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
3636
jsonwebtoken = { version = "9.3.1", default-features = false }
37+
tokio = { version = "1.38", optional = true, features = ["time"] }
3738

3839
[target.'cfg(target_arch = "wasm32")'.dependencies]
3940
uuid = { version = "1.17.0", default-features = false, features = ["v4", "js"] }
@@ -42,7 +43,7 @@ wasm-bindgen-futures = "0.4"
4243

4344
[features]
4445
default = ["reqwest"]
45-
reqwest = ["dep:reqwest", "pin-project-lite", "bytes"]
46+
reqwest = ["dep:reqwest", "dep:tokio", "pin-project-lite", "bytes"]
4647
futures-unsend = []
4748

4849
[dev-dependencies]

src/client.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
search::*,
1313
task_info::TaskInfo,
1414
tasks::{Task, TasksCancelQuery, TasksDeleteQuery, TasksResults, TasksSearchQuery},
15-
utils::async_sleep,
15+
utils::SleepBackend,
1616
DefaultHttpClient,
1717
};
1818

@@ -933,7 +933,7 @@ impl<Http: HttpClient> Client<Http> {
933933
}
934934
Task::Enqueued { .. } | Task::Processing { .. } => {
935935
elapsed_time += interval;
936-
async_sleep(interval).await;
936+
self.sleep_backend().sleep(interval).await;
937937
}
938938
},
939939
Err(error) => return Err(error),
@@ -1144,6 +1144,10 @@ impl<Http: HttpClient> Client<Http> {
11441144

11451145
crate::tenant_tokens::generate_tenant_token(api_key_uid, search_rules, api_key, expires_at)
11461146
}
1147+
1148+
fn sleep_backend(&self) -> SleepBackend {
1149+
SleepBackend::infer(self.http_client.is_tokio())
1150+
}
11471151
}
11481152

11491153
#[derive(Debug, Clone, Deserialize)]

src/request.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ pub trait HttpClient: Clone + Send + Sync {
101101
content_type: &str,
102102
expected_status_code: u16,
103103
) -> Result<Output, Error>;
104+
105+
fn is_tokio(&self) -> bool {
106+
false
107+
}
104108
}
105109

106110
pub fn parse_response<Output: DeserializeOwned>(

src/reqwest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ impl HttpClient for ReqwestClient {
112112

113113
parse_response(status, expected_status_code, &body, url.to_string())
114114
}
115+
116+
fn is_tokio(&self) -> bool {
117+
true
118+
}
115119
}
116120

117121
fn verb<Q, B>(method: &Method<Q, B>) -> reqwest::Method {

src/utils.rs

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,101 @@
11
use std::time::Duration;
22

3-
#[cfg(not(target_arch = "wasm32"))]
4-
pub(crate) async fn async_sleep(interval: Duration) {
5-
let (sender, receiver) = futures_channel::oneshot::channel::<()>();
6-
std::thread::spawn(move || {
7-
std::thread::sleep(interval);
8-
let _ = sender.send(());
9-
});
10-
let _ = receiver.await;
3+
#[derive(Debug, Copy, Clone)]
4+
pub(crate) enum SleepBackend {
5+
#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
6+
Tokio,
7+
#[cfg(not(target_arch = "wasm32"))]
8+
Thread,
9+
#[cfg(target_arch = "wasm32")]
10+
Javascript,
1111
}
1212

13-
#[cfg(target_arch = "wasm32")]
14-
pub(crate) async fn async_sleep(interval: Duration) {
15-
use std::convert::TryInto;
16-
use wasm_bindgen_futures::JsFuture;
17-
18-
JsFuture::from(web_sys::js_sys::Promise::new(&mut |yes, _| {
19-
web_sys::window()
20-
.unwrap()
21-
.set_timeout_with_callback_and_timeout_and_arguments_0(
22-
&yes,
23-
interval.as_millis().try_into().unwrap(),
24-
)
25-
.unwrap();
26-
}))
27-
.await
28-
.unwrap();
13+
impl SleepBackend {
14+
pub(crate) fn infer(is_tokio: bool) -> Self {
15+
#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
16+
if is_tokio {
17+
return Self::Tokio;
18+
}
19+
#[cfg(any(target_arch = "wasm32", not(feature = "reqwest")))]
20+
let _ = is_tokio;
21+
22+
#[cfg(not(target_arch = "wasm32"))]
23+
return Self::Thread;
24+
25+
#[cfg(target_arch = "wasm32")]
26+
return Self::Javascript;
27+
}
28+
29+
pub(crate) async fn sleep(self, interval: Duration) {
30+
match self {
31+
#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
32+
Self::Tokio => {
33+
tokio::time::sleep(interval).await;
34+
}
35+
#[cfg(not(target_arch = "wasm32"))]
36+
Self::Thread => {
37+
let (sender, receiver) = futures_channel::oneshot::channel::<()>();
38+
std::thread::spawn(move || {
39+
std::thread::sleep(interval);
40+
let _ = sender.send(());
41+
});
42+
let _ = receiver.await;
43+
}
44+
#[cfg(target_arch = "wasm32")]
45+
Self::Javascript => {
46+
use std::convert::TryInto;
47+
use wasm_bindgen_futures::JsFuture;
48+
49+
JsFuture::from(web_sys::js_sys::Promise::new(&mut |yes, _| {
50+
web_sys::window()
51+
.unwrap()
52+
.set_timeout_with_callback_and_timeout_and_arguments_0(
53+
&yes,
54+
interval.as_millis().try_into().unwrap(),
55+
)
56+
.unwrap();
57+
}))
58+
.await
59+
.unwrap();
60+
}
61+
}
62+
}
2963
}
3064

3165
#[cfg(test)]
3266
mod test {
3367
use super::*;
3468
use meilisearch_test_macro::meilisearch_test;
3569

70+
#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
71+
#[meilisearch_test]
72+
async fn sleep_tokio() {
73+
let sleep_duration = Duration::from_millis(10);
74+
let now = std::time::Instant::now();
75+
76+
SleepBackend::Tokio.sleep(sleep_duration).await;
77+
78+
assert!(now.elapsed() >= sleep_duration);
79+
}
80+
81+
#[cfg(not(target_arch = "wasm32"))]
82+
#[meilisearch_test]
83+
async fn sleep_thread() {
84+
let sleep_duration = Duration::from_millis(10);
85+
let now = std::time::Instant::now();
86+
87+
SleepBackend::Thread.sleep(sleep_duration).await;
88+
89+
assert!(now.elapsed() >= sleep_duration);
90+
}
91+
92+
#[cfg(target_arch = "wasm32")]
3693
#[meilisearch_test]
37-
async fn test_async_sleep() {
94+
async fn sleep_javascript() {
3895
let sleep_duration = Duration::from_millis(10);
3996
let now = std::time::Instant::now();
4097

41-
async_sleep(sleep_duration).await;
98+
SleepBackend::Javascript.sleep(sleep_duration).await;
4299

43100
assert!(now.elapsed() >= sleep_duration);
44101
}

0 commit comments

Comments
 (0)