Skip to content

Commit 73e350a

Browse files
feat: add retries to lightpush
1 parent c274e88 commit 73e350a

File tree

2 files changed

+46
-18
lines changed

2 files changed

+46
-18
lines changed

packages/sdk/src/protocols/sender/lightpush/index.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,17 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
9797
}
9898
if (failure) {
9999
failures.push(failure);
100-
void this.reliabilityMonitor.handleSendFailure(failure);
100+
101+
if (failure.peerId) {
102+
const peer = this.connectedPeers.find((connectedPeer) =>
103+
connectedPeer.id.equals(failure.peerId)
104+
);
105+
if (peer) {
106+
void this.reliabilityMonitor.attemptRetries(failure.peerId, () =>
107+
this.protocol.send(encoder, message, peer)
108+
);
109+
}
110+
}
101111
}
102112
} else {
103113
log.error("Failed unexpectedly while sending:", result.reason);
Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Peer, PeerId } from "@libp2p/interface";
2-
import { Failure, PeerIdStr } from "@waku/interfaces";
2+
import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces";
33
import { Logger } from "@waku/utils";
44

55
const log = new Logger("sdk:sender:reliability_monitor");
@@ -13,27 +13,45 @@ export class SenderReliabilityMonitor {
1313

1414
public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}
1515

16-
public handleSendFailure = async (failure: Failure): Promise<void> => {
17-
if (failure.peerId) {
18-
const peerIdStr = failure.peerId.toString();
19-
const currentAttempts = this.attempts.get(peerIdStr) || 0;
20-
this.attempts.set(peerIdStr, currentAttempts + 1);
21-
22-
if (currentAttempts + 1 >= this.maxAttemptsBeforeRenewal) {
23-
try {
24-
const newPeer = await this.renewPeer(failure.peerId);
25-
log.info(
26-
`Renewed peer ${failure.peerId.toString()} to ${newPeer.id.toString()}`
27-
);
16+
public async attemptRetries(
17+
peerId: PeerId,
18+
protocolSend: () => Promise<CoreProtocolResult>
19+
): Promise<void> {
20+
const peerIdStr = peerId.toString();
21+
const currentAttempts = this.attempts.get(peerIdStr) || 0;
22+
this.attempts.set(peerIdStr, currentAttempts + 1);
2823

24+
if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) {
25+
try {
26+
const result = await protocolSend();
27+
if (result.success) {
28+
log.info(`Successfully sent message after retry to ${peerIdStr}`);
2929
this.attempts.delete(peerIdStr);
30-
this.attempts.set(newPeer.id.toString(), 0);
31-
} catch (error) {
30+
} else {
3231
log.error(
33-
`Failed to renew peer ${failure.peerId.toString()}: ${error}`
32+
`Failed to send message after retry to ${peerIdStr}: ${result.failure}`
3433
);
34+
await this.attemptRetries(peerId, protocolSend);
3535
}
36+
} catch (error) {
37+
log.error(
38+
`Failed to send message after retry to ${peerIdStr}: ${error}`
39+
);
40+
await this.attemptRetries(peerId, protocolSend);
41+
}
42+
} else {
43+
try {
44+
const newPeer = await this.renewPeer(peerId);
45+
log.info(
46+
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
47+
);
48+
49+
this.attempts.delete(peerIdStr);
50+
this.attempts.set(newPeer.id.toString(), 0);
51+
await protocolSend();
52+
} catch (error) {
53+
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
3654
}
3755
}
38-
};
56+
}
3957
}

0 commit comments

Comments
 (0)