Skip to content

Commit 0df12c4

Browse files
committed
feat(sds)_: add command queue architecture and improve message handling
- Introduce command queue system for sequential task processing - Add comprehensive event system for message lifecycle tracking - Restructure codebase with separate bloom_filter directory - Export encode/decode helpers for SDS proto messages - Use Set for deduplication in missing message detection - Fix sync message handling for empty content messages - Always emit MissedMessages event even with empty array - Improve duplicate message detection logic
1 parent 49f26d8 commit 0df12c4

File tree

8 files changed

+406
-155
lines changed

8 files changed

+406
-155
lines changed

packages/sds/src/bloom.ts renamed to packages/sds/src/bloom_filter/bloom.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { hashN } from "./nim_hashn/nim_hashn.mjs";
2-
import { getMOverNBitsForK } from "./probabilities.js";
1+
import { hashN } from "../nim_hashn/nim_hashn.mjs";
2+
import { getMOverNBitsForK } from "../probabilities.js";
33

44
export interface BloomFilterOptions {
55
// The expected maximum number of elements for which this BloomFilter is sized.

packages/sds/src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1-
import { BloomFilter } from "./bloom.js";
1+
import { BloomFilter } from "./bloom_filter/bloom.js";
2+
3+
export * from "./message_channel/index.js";
24

35
export { BloomFilter };
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { Message } from "./events.js";
2+
3+
export enum Command {
4+
Send = "send",
5+
Receive = "receive",
6+
SendEphemeral = "sendEphemeral"
7+
}
8+
9+
export interface ParamsByAction {
10+
[Command.Send]: {
11+
payload: Uint8Array;
12+
callback?: (message: Message) => Promise<{
13+
success: boolean;
14+
retrievalHint?: Uint8Array;
15+
}>;
16+
};
17+
[Command.Receive]: {
18+
message: Message;
19+
};
20+
[Command.SendEphemeral]: {
21+
payload: Uint8Array;
22+
callback?: (message: Message) => Promise<boolean>;
23+
};
24+
}
25+
26+
export type Task<A extends Command = Command> = {
27+
command: A;
28+
params: ParamsByAction[A];
29+
};
30+
31+
// Define a mapping for handlers based on action type
32+
export type Handlers = {
33+
[A in Command]: (params: ParamsByAction[A]) => Promise<void>;
34+
};
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { proto_sds_message } from "@waku/proto";
2+
3+
export enum MessageChannelEvent {
4+
MessageSent = "messageSent",
5+
MessageDelivered = "messageDelivered",
6+
MessageReceived = "messageReceived",
7+
MessageAcknowledged = "messageAcknowledged",
8+
PartialAcknowledgement = "partialAcknowledgement",
9+
MissedMessages = "missedMessages",
10+
SyncSent = "syncSent",
11+
SyncReceived = "syncReceived"
12+
}
13+
14+
export type Message = proto_sds_message.SdsMessage;
15+
export type HistoryEntry = proto_sds_message.HistoryEntry;
16+
export type ChannelId = string;
17+
18+
export function encodeMessage(message: Message): Uint8Array {
19+
return proto_sds_message.SdsMessage.encode(message);
20+
}
21+
22+
export function decodeMessage(data: Uint8Array): Message {
23+
return proto_sds_message.SdsMessage.decode(data);
24+
}
25+
26+
export type MessageChannelEvents = {
27+
[MessageChannelEvent.MessageSent]: CustomEvent<Message>;
28+
[MessageChannelEvent.MessageDelivered]: CustomEvent<{
29+
messageId: string;
30+
sentOrReceived: "sent" | "received";
31+
}>;
32+
[MessageChannelEvent.MessageReceived]: CustomEvent<Message>;
33+
[MessageChannelEvent.MessageAcknowledged]: CustomEvent<string>;
34+
[MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{
35+
messageId: string;
36+
count: number;
37+
}>;
38+
[MessageChannelEvent.MissedMessages]: CustomEvent<HistoryEntry[]>;
39+
[MessageChannelEvent.SyncSent]: CustomEvent<Message>;
40+
[MessageChannelEvent.SyncReceived]: CustomEvent<Message>;
41+
};
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export * from "./command_queue.js";
2+
export * from "./events.js";
3+
export * from "./message_channel.js";

0 commit comments

Comments
 (0)