Skip to content

Commit 9b0f1e8

Browse files
feat: validate messages for individual filter nodes & perform renewals (#2057)
* feat: validate messages for individual filter nodes & perform renewals * chore: fix spell check * chore: use a max threshold before peer renewal * chore: switch from a validation cycle timer to adhoc validation * chore: add test * fix: test * chore: address comments * fix: renewal without a new peer available * chore: validating messages should be non-blocking * chore: minor improvements * chore: rm only * chore: fix test
1 parent 00635b7 commit 9b0f1e8

File tree

5 files changed

+191
-24
lines changed

5 files changed

+191
-24
lines changed

packages/core/src/lib/filter/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
3535
constructor(
3636
private handleIncomingMessage: (
3737
pubsubTopic: PubsubTopic,
38-
wakuMessage: WakuMessage
38+
wakuMessage: WakuMessage,
39+
peerIdStr: string
3940
) => Promise<void>,
4041
libp2p: Libp2p,
4142
options?: ProtocolCreateOptions
@@ -78,7 +79,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
7879
return;
7980
}
8081

81-
await this.handleIncomingMessage(pubsubTopic, wakuMessage);
82+
await this.handleIncomingMessage(
83+
pubsubTopic,
84+
wakuMessage,
85+
connection.remotePeer.toString()
86+
);
8287
}
8388
}).then(
8489
() => {

packages/interfaces/src/filter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import type { IReceiver } from "./receiver.js";
1616
export type SubscribeOptions = {
1717
keepAlive?: number;
1818
pingsBeforePeerRenewed?: number;
19+
maxMissedMessagesThreshold?: number;
1920
};
2021

2122
export type IFilter = IReceiver & IBaseProtocolCore;

packages/sdk/src/protocols/base_protocol.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,15 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
5151
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
5252
this.log.info(`Renewing peer ${peerToDisconnect}`);
5353

54+
await this.connectionManager.dropConnection(peerToDisconnect);
55+
5456
const peer = (await this.findAndAddPeers(1))[0];
5557
if (!peer) {
56-
throw new Error(
57-
"Failed to find a new peer to replace the disconnected one"
58+
this.log.error(
59+
"Failed to find a new peer to replace the disconnected one."
5860
);
5961
}
6062

61-
await this.connectionManager.dropConnection(peerToDisconnect);
6263
this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect));
6364
this.log.info(
6465
`Peer ${peerToDisconnect} disconnected and removed from the peer list`

packages/sdk/src/protocols/filter.ts

Lines changed: 118 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,23 @@ import { ConnectionManager, FilterCore } from "@waku/core";
44
import {
55
type Callback,
66
type ContentTopic,
7-
CoreProtocolResult,
8-
CreateSubscriptionResult,
7+
type CoreProtocolResult,
8+
type CreateSubscriptionResult,
99
type IAsyncIterator,
1010
type IDecodedMessage,
1111
type IDecoder,
1212
type IFilterSDK,
1313
type IProtoMessage,
1414
type ISubscriptionSDK,
1515
type Libp2p,
16+
type PeerIdStr,
1617
type ProtocolCreateOptions,
1718
ProtocolError,
18-
ProtocolUseOptions,
19+
type ProtocolUseOptions,
1920
type PubsubTopic,
20-
SDKProtocolResult,
21+
type SDKProtocolResult,
2122
type ShardingParams,
22-
SubscribeOptions,
23+
type SubscribeOptions,
2324
type Unsubscribe
2425
} from "@waku/interfaces";
2526
import { messageHashStr } from "@waku/message-hash";
@@ -39,9 +40,17 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
3940
callback: Callback<T>;
4041
};
4142

43+
type ReceivedMessageHashes = {
44+
all: Set<string>;
45+
nodes: {
46+
[peerId: PeerIdStr]: Set<string>;
47+
};
48+
};
49+
4250
const log = new Logger("sdk:filter");
4351

4452
const DEFAULT_MAX_PINGS = 3;
53+
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
4554
const DEFAULT_KEEP_ALIVE = 30 * 1000;
4655

4756
const DEFAULT_SUBSCRIBE_OPTIONS = {
@@ -51,8 +60,11 @@ export class SubscriptionManager implements ISubscriptionSDK {
5160
private readonly pubsubTopic: PubsubTopic;
5261
readonly receivedMessagesHashStr: string[] = [];
5362
private keepAliveTimer: number | null = null;
63+
private readonly receivedMessagesHashes: ReceivedMessageHashes;
5464
private peerFailures: Map<string, number> = new Map();
65+
private missedMessagesByPeer: Map<string, number> = new Map();
5566
private maxPingFailures: number = DEFAULT_MAX_PINGS;
67+
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
5668

5769
private subscriptionCallbacks: Map<
5870
ContentTopic,
@@ -67,14 +79,38 @@ export class SubscriptionManager implements ISubscriptionSDK {
6779
) {
6880
this.pubsubTopic = pubsubTopic;
6981
this.subscriptionCallbacks = new Map();
82+
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
83+
this.receivedMessagesHashes = {
84+
all: new Set(),
85+
nodes: {
86+
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
87+
}
88+
};
89+
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
90+
}
91+
92+
get messageHashes(): string[] {
93+
return [...this.receivedMessagesHashes.all];
94+
}
95+
96+
private addHash(hash: string, peerIdStr?: string): void {
97+
this.receivedMessagesHashes.all.add(hash);
98+
99+
if (peerIdStr) {
100+
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
101+
}
70102
}
71103

72104
public async subscribe<T extends IDecodedMessage>(
73105
decoders: IDecoder<T> | IDecoder<T>[],
74106
callback: Callback<T>,
75107
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
76108
): Promise<SDKProtocolResult> {
109+
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
77110
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
111+
this.maxMissedMessagesThreshold =
112+
options.maxMissedMessagesThreshold ||
113+
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
78114

79115
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
80116

@@ -146,8 +182,10 @@ export class SubscriptionManager implements ISubscriptionSDK {
146182
const results = await Promise.allSettled(promises);
147183
const finalResult = this.handleResult(results, "unsubscribe");
148184

149-
if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) {
150-
this.stopKeepAlivePings();
185+
if (this.subscriptionCallbacks.size === 0) {
186+
if (this.keepAliveTimer) {
187+
this.stopKeepAlivePings();
188+
}
151189
}
152190

153191
return finalResult;
@@ -180,11 +218,49 @@ export class SubscriptionManager implements ISubscriptionSDK {
180218
return finalResult;
181219
}
182220

183-
async processIncomingMessage(message: WakuMessage): Promise<void> {
221+
private async validateMessage(): Promise<void> {
222+
for (const hash of this.receivedMessagesHashes.all) {
223+
for (const [peerIdStr, hashes] of Object.entries(
224+
this.receivedMessagesHashes.nodes
225+
)) {
226+
if (!hashes.has(hash)) {
227+
this.incrementMissedMessageCount(peerIdStr);
228+
if (this.shouldRenewPeer(peerIdStr)) {
229+
log.info(
230+
`Peer ${peerIdStr} has missed too many messages, renewing.`
231+
);
232+
const peerId = this.getPeers().find(
233+
(p) => p.id.toString() === peerIdStr
234+
)?.id;
235+
if (!peerId) {
236+
log.error(
237+
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
238+
);
239+
continue;
240+
}
241+
try {
242+
await this.renewAndSubscribePeer(peerId);
243+
} catch (error) {
244+
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
245+
}
246+
}
247+
}
248+
}
249+
}
250+
}
251+
252+
async processIncomingMessage(
253+
message: WakuMessage,
254+
peerIdStr: string
255+
): Promise<void> {
184256
const hashedMessageStr = messageHashStr(
185257
this.pubsubTopic,
186258
message as IProtoMessage
187259
);
260+
261+
this.addHash(hashedMessageStr, peerIdStr);
262+
void this.validateMessage();
263+
188264
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
189265
log.info("Message already received, skipping");
190266
return;
@@ -277,15 +353,29 @@ export class SubscriptionManager implements ISubscriptionSDK {
277353
}
278354
}
279355

280-
private async renewAndSubscribePeer(peerId: PeerId): Promise<Peer> {
281-
const newPeer = await this.renewPeer(peerId);
282-
await this.protocol.subscribe(
283-
this.pubsubTopic,
284-
newPeer,
285-
Array.from(this.subscriptionCallbacks.keys())
286-
);
356+
private async renewAndSubscribePeer(
357+
peerId: PeerId
358+
): Promise<Peer | undefined> {
359+
try {
360+
const newPeer = await this.renewPeer(peerId);
361+
await this.protocol.subscribe(
362+
this.pubsubTopic,
363+
newPeer,
364+
Array.from(this.subscriptionCallbacks.keys())
365+
);
366+
367+
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
368+
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
287369

288-
return newPeer;
370+
return newPeer;
371+
} catch (error) {
372+
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
373+
return;
374+
} finally {
375+
this.peerFailures.delete(peerId.toString());
376+
this.missedMessagesByPeer.delete(peerId.toString());
377+
delete this.receivedMessagesHashes.nodes[peerId.toString()];
378+
}
289379
}
290380

291381
private startKeepAlivePings(options: SubscribeOptions): void {
@@ -312,6 +402,16 @@ export class SubscriptionManager implements ISubscriptionSDK {
312402
clearInterval(this.keepAliveTimer);
313403
this.keepAliveTimer = null;
314404
}
405+
406+
private incrementMissedMessageCount(peerIdStr: string): void {
407+
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
408+
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
409+
}
410+
411+
private shouldRenewPeer(peerIdStr: string): boolean {
412+
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
413+
return missedMessages > this.maxMissedMessagesThreshold;
414+
}
315415
}
316416

317417
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
@@ -326,7 +426,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
326426
) {
327427
super(
328428
new FilterCore(
329-
async (pubsubTopic: PubsubTopic, wakuMessage: WakuMessage) => {
429+
async (pubsubTopic, wakuMessage, peerIdStr) => {
330430
const subscription = this.getActiveSubscription(pubsubTopic);
331431
if (!subscription) {
332432
log.error(
@@ -335,7 +435,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
335435
return;
336436
}
337437

338-
await subscription.processIncomingMessage(wakuMessage);
438+
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
339439
},
340440
libp2p,
341441
options

packages/tests/tests/filter/peer_management.spec.ts

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import {
22
DefaultPubsubTopic,
33
ISubscriptionSDK,
4-
LightNode
4+
LightNode,
5+
SDKProtocolResult
56
} from "@waku/interfaces";
67
import {
78
createDecoder,
@@ -16,6 +17,7 @@ import { describe } from "mocha";
1617
import {
1718
afterEachCustom,
1819
beforeEachCustom,
20+
ServiceNode,
1921
ServiceNodesFleet
2022
} from "../../src/index.js";
2123
import {
@@ -177,4 +179,62 @@ describe("Waku Filter: Peer Management: E2E", function () {
177179
waku.filter.numPeersToUse
178180
);
179181
});
182+
183+
it("Renews peer on consistent missed messages", async function () {
184+
const [serviceNodes, waku] = await runMultipleNodes(
185+
this.ctx,
186+
undefined,
187+
undefined,
188+
2
189+
);
190+
const serviceNodesPeerIdStr = await Promise.all(
191+
serviceNodes.nodes.map(async (node) =>
192+
(await node.getPeerId()).toString()
193+
)
194+
);
195+
const nodeWithoutDiscovery = new ServiceNode("WithoutDiscovery");
196+
await nodeWithoutDiscovery.start({ lightpush: true, filter: true });
197+
const nodeWithouDiscoveryPeerIdStr = (
198+
await nodeWithoutDiscovery.getPeerId()
199+
).toString();
200+
await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId());
201+
202+
const { error, subscription: sub } =
203+
await waku.filter.createSubscription(pubsubTopic);
204+
if (!sub || error) {
205+
throw new Error("Could not create subscription");
206+
}
207+
208+
const messages: DecodedMessage[] = [];
209+
const { successes } = await sub.subscribe([decoder], (msg) => {
210+
messages.push(msg);
211+
});
212+
213+
expect(successes.length).to.be.greaterThan(0);
214+
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);
215+
216+
const sendMessage: () => Promise<SDKProtocolResult> = async () =>
217+
waku.lightPush.send(encoder, {
218+
payload: utf8ToBytes("Hello_World")
219+
});
220+
221+
await sendMessage();
222+
223+
successes
224+
.map((peerId) =>
225+
[nodeWithouDiscoveryPeerIdStr, ...serviceNodesPeerIdStr].includes(
226+
peerId.toString()
227+
)
228+
)
229+
.forEach((isConnected) => expect(isConnected).to.eq(true));
230+
231+
// send 2 more messages
232+
await sendMessage();
233+
await sendMessage();
234+
235+
expect(waku.filter.connectedPeers.length).to.equal(2);
236+
expect(
237+
waku.filter.connectedPeers.map((p) => p.id.toString())
238+
).to.not.include(nodeWithouDiscoveryPeerIdStr);
239+
});
180240
});

0 commit comments

Comments
 (0)