Skip to content

Commit 9fee1e8

Browse files
authored
fix(pubsub): retry connecting to backend (#2254)
* fix(`pubsub`): retry connecting to backend * clippy
1 parent cbdd411 commit 9fee1e8

File tree

1 file changed

+33
-4
lines changed

1 file changed

+33
-4
lines changed

crates/pubsub/src/service.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ pub(crate) struct PubSubService<T> {
3131

3232
/// The request manager.
3333
pub(crate) in_flights: RequestManager,
34+
35+
/// Number of retries. Default is 10.
36+
///
37+
/// Every retry is made at an interval of 3 seconds.
38+
pub(crate) retries: u32,
3439
}
3540

3641
impl<T: PubSubConnect> PubSubService<T> {
@@ -45,6 +50,7 @@ impl<T: PubSubConnect> PubSubService<T> {
4550
reqs,
4651
subs: SubscriptionManager::default(),
4752
in_flights: Default::default(),
53+
retries: 10,
4854
};
4955
this.spawn();
5056
Ok(PubSubFrontend::new(tx))
@@ -190,6 +196,31 @@ impl<T: PubSubConnect> PubSubService<T> {
190196
Ok(())
191197
}
192198

199+
/// Attempt to reconnect with retries
200+
async fn reconnect_with_retries(&mut self) -> TransportResult<()> {
201+
let mut retry_count = 0;
202+
loop {
203+
match self.reconnect().await {
204+
Ok(()) => break Ok(()),
205+
Err(e) => {
206+
retry_count += 1;
207+
if retry_count >= self.retries {
208+
error!(
209+
"Reconnect failed after {} attempts, shutting down: {e}",
210+
retry_count
211+
);
212+
break Err(e);
213+
}
214+
warn!(
215+
"Reconnection attempt {}/{} failed: {}. Retrying in 3 seconds...",
216+
retry_count, self.retries, e
217+
);
218+
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
219+
}
220+
}
221+
}
222+
}
223+
193224
/// Spawn the service.
194225
pub(crate) fn spawn(mut self) {
195226
let fut = async move {
@@ -205,16 +236,14 @@ impl<T: PubSubConnect> PubSubService<T> {
205236
if let Err(e) = self.handle_item(item) {
206237
break Err(e)
207238
}
208-
} else if let Err(e) = self.reconnect().await {
209-
error!("Reconnect failed, shutting down: {e}");
239+
} else if let Err(e) = self.reconnect_with_retries().await {
210240
break Err(e)
211241
}
212242
}
213243

214244
_ = &mut self.handle.error => {
215245
error!("Pubsub service backend error.");
216-
if let Err(e) = self.reconnect().await {
217-
error!("Reconnect failed, shutting down: {e}");
246+
if let Err(e) = self.reconnect_with_retries().await {
218247
break Err(e)
219248
}
220249
}

0 commit comments

Comments
 (0)