Skip to content

feat(filter): reliability monitor as a separate class to handle reliability logic #2116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 106 additions & 108 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,18 @@ const DEFAULT_KEEP_ALIVE = 30 * 1000;
const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private readonly receivedMessagesHashes: ReceivedMessageHashes;

class ReliabilityMonitor {
private receivedMessagesHashes: ReceivedMessageHashes;
private peerFailures: Map<string, number> = new Map();
private missedMessagesByPeer: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;

public constructor(
private readonly pubsubTopic: PubsubTopic,
private protocol: FilterCore,
private getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
Expand All @@ -89,28 +79,118 @@ export class SubscriptionManager implements ISubscriptionSDK {
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}

public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}

private addHash(hash: string, peerIdStr?: string): void {
public addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);

if (peerIdStr) {
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
}
}

public async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}

public async handlePeerFailure(peerId: PeerId): Promise<void> {
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
this.peerFailures.set(peerId.toString(), failures);

if (failures > this.maxPingFailures) {
try {
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
}
}
}

private async renewAndSubscribePeer(
peerId: PeerId
): Promise<Peer | undefined> {
try {
const newPeer = await this.renewPeer(peerId);
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);

return newPeer;
} catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
return;
} finally {
this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
}
}

private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}

private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}

export class SubscriptionManager implements ISubscriptionSDK {
private readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
private reliabilityMonitor: ReliabilityMonitor;

public constructor(
private readonly pubsubTopic: PubsubTopic,
private protocol: FilterCore,
private getPeers: () => Peer[],
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
this.reliabilityMonitor = new ReliabilityMonitor(
getPeers.bind(this),
renewPeer.bind(this)
);
}

public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
this.maxMissedMessagesThreshold =
options.maxMissedMessagesThreshold ||
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

Expand Down Expand Up @@ -218,37 +298,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}

private async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}

public async processIncomingMessage(
message: WakuMessage,
peerIdStr: PeerIdStr
Expand All @@ -258,8 +307,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
message as IProtoMessage
);

this.addHash(hashedMessageStr, peerIdStr);
void this.validateMessage();
this.reliabilityMonitor.addHash(hashedMessageStr, peerIdStr);
void this.reliabilityMonitor.validateMessage();

if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
log.info("Message already received, skipping");
Expand Down Expand Up @@ -322,13 +371,11 @@ export class SubscriptionManager implements ISubscriptionSDK {
try {
const result = await this.protocol.ping(peer);
if (result.failure) {
await this.handlePeerFailure(peerId);
} else {
this.peerFailures.delete(peerId.toString());
await this.reliabilityMonitor.handlePeerFailure(peerId);
}
return result;
} catch (error) {
await this.handlePeerFailure(peerId);
await this.reliabilityMonitor.handlePeerFailure(peerId);
return {
success: null,
failure: {
Expand All @@ -339,45 +386,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}

private async handlePeerFailure(peerId: PeerId): Promise<void> {
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
this.peerFailures.set(peerId.toString(), failures);

if (failures > this.maxPingFailures) {
try {
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
}
}
}

private async renewAndSubscribePeer(
peerId: PeerId
): Promise<Peer | undefined> {
try {
const newPeer = await this.renewPeer(peerId);
await this.protocol.subscribe(
this.pubsubTopic,
newPeer,
Array.from(this.subscriptionCallbacks.keys())
);

this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);

return newPeer;
} catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
return;
} finally {
this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
}
}

private startKeepAlivePings(options: SubscribeOptions): void {
const { keepAlive } = options;
if (this.keepAliveTimer) {
Expand All @@ -402,16 +410,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
}

private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}

private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}

class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
Expand Down
24 changes: 12 additions & 12 deletions packages/tests/.mocharc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ const config = {
retries: 4
};

if (process.env.CI) {
console.log("Running tests in parallel");
config.parallel = true;
config.jobs = 6;
console.log("Activating allure reporting");
config.reporter = 'mocha-multi-reporters';
config.reporterOptions = {
configFile: '.mocha.reporters.json'
};
} else {
console.log("Running tests serially. To enable parallel execution update mocha config");
}
// if (process.env.CI) {
// console.log("Running tests in parallel");
// config.parallel = true;
// config.jobs = 6;
// console.log("Activating allure reporting");
// config.reporter = 'mocha-multi-reporters';
// config.reporterOptions = {
// configFile: '.mocha.reporters.json'
// };
// } else {
// console.log("Running tests serially. To enable parallel execution update mocha config");
// }

module.exports = config;
2 changes: 1 addition & 1 deletion packages/tests/tests/filter/peer_management.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
teardownNodesWithRedundancy
} from "../../src/index.js";

describe("Waku Filter: Peer Management: E2E", function () {
describe.only("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
Expand Down
Loading