diff --git a/package-lock.json b/package-lock.json index d272b310eb..f7ddadd12a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11727,6 +11727,18 @@ "resolved": "packages/message-encryption", "link": true }, + "node_modules/@waku/message-hash": { + "version": "0.1.19", + "resolved": "https://registry.npmjs.org/@waku/message-hash/-/message-hash-0.1.19.tgz", + "integrity": "sha512-fl+qky3MQK8l3HTT5wq23NcdYFYNqVcUVwBblX9/IArcDlDNjEEdK68K3n8rFWxBBd2JAK0RxU7MMkLiK3vWUA==", + "dependencies": { + "@noble/hashes": "^1.3.2", + "@waku/utils": "0.0.23" + }, + "engines": { + "node": ">=20" + } + }, "node_modules/@waku/proto": { "resolved": "packages/proto", "link": true @@ -44183,6 +44195,7 @@ "@types/lodash": "^4.17.15", "@types/sinon": "^17.0.3", "@waku/build-utils": "^1.0.0", + "@waku/interfaces": "0.0.30", "@waku/message-encryption": "^0.0.33", "deep-equal-in-any-order": "^2.0.6", "fast-check": "^3.23.2", @@ -44574,6 +44587,7 @@ "@waku/core": "*", "@waku/enr": "*", "@waku/interfaces": "*", + "@waku/message-hash": "^0.1.17", "@waku/utils": "*", "app-root-path": "^3.1.0", "chai-as-promised": "^7.1.1", diff --git a/packages/core/src/lib/message_hash/message_hash.spec.ts b/packages/core/src/lib/message_hash/message_hash.spec.ts index 6988ad84f9..43fbd77c20 100644 --- a/packages/core/src/lib/message_hash/message_hash.spec.ts +++ b/packages/core/src/lib/message_hash/message_hash.spec.ts @@ -1,4 +1,4 @@ -import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces"; +import type { IProtoMessage } from "@waku/interfaces"; import { bytesToHex, hexToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -93,19 +93,20 @@ describe("Message Hash: RFC Test Vectors", () => { expect(bytesToHex(hash)).to.equal(expectedHash); }); - it("Waku message hash computation (message is IDecodedMessage)", () => { + it("Waku message hash computation (message is IProtoMessage with version)", () => { const expectedHash = "3f11bc950dce0e3ffdcf205ae6414c01130bb5d9f20644869bff80407fa52c8f"; const pubsubTopic = "/waku/2/default-waku/proto"; - const message: IDecodedMessage = { - version: 0, + const message: IProtoMessage = { payload: new Uint8Array(), - pubsubTopic, contentTopic: "/waku/2/default-content/proto", meta: hexToBytes("0x73757065722d736563726574"), - timestamp: new Date("2024-04-30T10:54:14.978Z"), + timestamp: + BigInt(new Date("2024-04-30T10:54:14.978Z").getTime()) * + BigInt(1000000), ephemeral: undefined, - rateLimitProof: undefined + rateLimitProof: undefined, + version: 0 }; const hash = messageHash(pubsubTopic, message); expect(bytesToHex(hash)).to.equal(expectedHash); @@ -144,16 +145,17 @@ describe("messageHash and messageHashStr", () => { expect(hashStr).to.equal(hashStrFromBytes); }); - it("messageHashStr works with IDecodedMessage", () => { - const decodedMessage: IDecodedMessage = { - version: 0, + it("messageHashStr works with IProtoMessage", () => { + const decodedMessage: IProtoMessage = { payload: new Uint8Array([1, 2, 3, 4]), - pubsubTopic, contentTopic: "/waku/2/default-content/proto", meta: new Uint8Array([5, 6, 7, 8]), - timestamp: new Date("2024-04-30T10:54:14.978Z"), + timestamp: + BigInt(new Date("2024-04-30T10:54:14.978Z").getTime()) * + BigInt(1000000), ephemeral: undefined, - rateLimitProof: undefined + rateLimitProof: undefined, + version: 0 }; const hashStr = messageHashStr(pubsubTopic, decodedMessage); diff --git a/packages/core/src/lib/store/rpc.spec.ts b/packages/core/src/lib/store/rpc.spec.ts new file mode 100644 index 0000000000..6e38449c2f --- /dev/null +++ b/packages/core/src/lib/store/rpc.spec.ts @@ -0,0 +1,93 @@ +import { expect } from "chai"; + +import { StoreQueryRequest } from "./rpc.js"; + +describe("StoreQueryRequest validation", () => { + it("accepts valid content-filtered query", () => { + const request = StoreQueryRequest.create({ + pubsubTopic: "/waku/2/default-waku/proto", + contentTopics: ["/test/1/content/proto"], + includeData: true, + paginationForward: true + }); + expect(request).to.exist; + }); + + it("rejects content-filtered query with only pubsubTopic", () => { + expect(() => + StoreQueryRequest.create({ + pubsubTopic: "/waku/2/default-waku/proto", + contentTopics: [], + includeData: true, + paginationForward: true + }) + ).to.throw( + "Both pubsubTopic and contentTopics must be set together for content-filtered queries" + ); + }); + + it("rejects content-filtered query with only contentTopics", () => { + expect(() => + StoreQueryRequest.create({ + pubsubTopic: "", + contentTopics: ["/test/1/content/proto"], + includeData: true, + paginationForward: true + }) + ).to.throw( + "Both pubsubTopic and contentTopics must be set together for content-filtered queries" + ); + }); + + it("accepts valid message hash query", () => { + const request = StoreQueryRequest.create({ + pubsubTopic: "", + contentTopics: [], + messageHashes: [new Uint8Array([1, 2, 3, 4])], + includeData: true, + paginationForward: true + }); + expect(request).to.exist; + }); + + it("rejects hash query with content filter parameters", () => { + expect(() => + StoreQueryRequest.create({ + messageHashes: [new Uint8Array([1, 2, 3, 4])], + pubsubTopic: "/waku/2/default-waku/proto", + contentTopics: ["/test/1/content/proto"], + includeData: true, + paginationForward: true + }) + ).to.throw( + "Message hash lookup queries cannot include content filter criteria" + ); + }); + + it("rejects hash query with time filter", () => { + expect(() => + StoreQueryRequest.create({ + pubsubTopic: "", + contentTopics: [], + messageHashes: [new Uint8Array([1, 2, 3, 4])], + timeStart: new Date(), + includeData: true, + paginationForward: true + }) + ).to.throw( + "Message hash lookup queries cannot include content filter criteria" + ); + }); + + it("accepts time-filtered query with content filter", () => { + const request = StoreQueryRequest.create({ + pubsubTopic: "/waku/2/default-waku/proto", + contentTopics: ["/test/1/content/proto"], + timeStart: new Date(Date.now() - 3600000), + timeEnd: new Date(), + includeData: true, + paginationForward: true + }); + expect(request).to.exist; + }); +}); diff --git a/packages/core/src/lib/store/rpc.ts b/packages/core/src/lib/store/rpc.ts index 2ad63361e8..800c75af61 100644 --- a/packages/core/src/lib/store/rpc.ts +++ b/packages/core/src/lib/store/rpc.ts @@ -14,6 +14,7 @@ export class StoreQueryRequest { public static create(params: QueryRequestParams): StoreQueryRequest { const request = new StoreQueryRequest({ ...params, + contentTopics: params.contentTopics || [], requestId: uuid(), timeStart: params.timeStart ? BigInt(params.timeStart.getTime() * ONE_MILLION) @@ -27,26 +28,29 @@ export class StoreQueryRequest { : undefined }); - // Validate request parameters based on RFC - if ( - (params.pubsubTopic && !params.contentTopics) || - (!params.pubsubTopic && params.contentTopics) - ) { - throw new Error( - "Both pubsubTopic and contentTopics must be set or unset" - ); - } + const isHashQuery = params.messageHashes && params.messageHashes.length > 0; + const hasContentTopics = + params.contentTopics && params.contentTopics.length > 0; + const hasTimeFilter = params.timeStart || params.timeEnd; - if ( - params.messageHashes && - (params.pubsubTopic || - params.contentTopics || - params.timeStart || - params.timeEnd) - ) { - throw new Error( - "Message hash lookup queries cannot include content filter criteria" - ); + if (isHashQuery) { + if (hasContentTopics || hasTimeFilter) { + throw new Error( + "Message hash lookup queries cannot include content filter criteria (contentTopics, timeStart, or timeEnd)" + ); + } + } else { + if ( + (params.pubsubTopic && + (!params.contentTopics || params.contentTopics.length === 0)) || + (!params.pubsubTopic && + params.contentTopics && + params.contentTopics.length > 0) + ) { + throw new Error( + "Both pubsubTopic and contentTopics must be set together for content-filtered queries" + ); + } } return request; diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index f3b5fdba37..655593cfe5 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -40,9 +40,14 @@ export class StoreCore extends BaseProtocol implements IStoreCore { decoders: Map>, peerId: PeerId ): AsyncGenerator[]> { + // Only validate decoder content topics for content-filtered queries + const isHashQuery = + queryOpts.messageHashes && queryOpts.messageHashes.length > 0; if ( + !isHashQuery && + queryOpts.contentTopics && queryOpts.contentTopics.toString() !== - Array.from(decoders.keys()).toString() + Array.from(decoders.keys()).toString() ) { throw new Error( "Internal error, the decoders should match the query's content topics" @@ -56,6 +61,13 @@ export class StoreCore extends BaseProtocol implements IStoreCore { paginationCursor: currentCursor }); + log.info("Sending store query request:", { + hasMessageHashes: !!queryOpts.messageHashes?.length, + messageHashCount: queryOpts.messageHashes?.length, + pubsubTopic: queryOpts.pubsubTopic, + contentTopics: queryOpts.contentTopics + }); + let stream; try { stream = await this.getStream(peerId); diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index d271bea336..f3c9b7b76e 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -57,10 +57,32 @@ export class Store implements IStore { decoders: IDecoder[], options?: Partial ): AsyncGenerator[]> { - const { pubsubTopic, contentTopics, decodersAsMap } = - this.validateDecodersAndPubsubTopic(decoders); + // For message hash queries, don't validate decoders but still need decodersAsMap + const isHashQuery = + options?.messageHashes && options.messageHashes.length > 0; + + let pubsubTopic: string; + let contentTopics: string[]; + let decodersAsMap: Map>; + + if (isHashQuery) { + // For hash queries, we still need decoders to decode messages + // but we don't validate pubsubTopic consistency + // Use pubsubTopic from options if provided, otherwise from first decoder + pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || ""; + contentTopics = []; + decodersAsMap = new Map(); + decoders.forEach((dec) => { + decodersAsMap.set(dec.contentTopic, dec); + }); + } else { + const validated = this.validateDecodersAndPubsubTopic(decoders); + pubsubTopic = validated.pubsubTopic; + contentTopics = validated.contentTopics; + decodersAsMap = validated.decodersAsMap; + } - const queryOpts = { + const queryOpts: QueryRequestParams = { pubsubTopic, contentTopics, includeData: true, diff --git a/packages/tests/package.json b/packages/tests/package.json index c1e3385009..3ee7dddd04 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -55,6 +55,7 @@ "@waku/core": "*", "@waku/enr": "*", "@waku/interfaces": "*", + "@waku/message-hash": "^0.1.17", "@waku/utils": "*", "app-root-path": "^3.1.0", "chai-as-promised": "^7.1.1", diff --git a/packages/tests/tests/store/message_hash.spec.ts b/packages/tests/tests/store/message_hash.spec.ts new file mode 100644 index 0000000000..bba98ae109 --- /dev/null +++ b/packages/tests/tests/store/message_hash.spec.ts @@ -0,0 +1,86 @@ +import { messageHash } from "@waku/core"; +import type { IDecodedMessage, LightNode } from "@waku/interfaces"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +import { + runStoreNodes, + sendMessages, + TestDecoder, + TestShardInfo, + totalMsgs +} from "./utils.js"; + +describe("Waku Store, message hash query", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: ServiceNode; + + beforeEachCustom(this, async () => { + [nwaku, waku] = await runStoreNodes(this.ctx, TestShardInfo); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, [waku]); + }); + + it("can query messages normally", async function () { + await sendMessages( + nwaku, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic, + true + ); + + const messages: IDecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page) { + messages.push(msg as IDecodedMessage); + } + } + + expect(messages.length).to.equal(totalMsgs); + }); + + it("can query messages by message hash", async function () { + const sentMessages = await sendMessages( + nwaku, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic, + true + ); + const messageHashes = sentMessages.map((msg) => + messageHash(TestDecoder.pubsubTopic, { + payload: Buffer.from(msg.payload, "base64"), + contentTopic: msg.contentTopic || TestDecoder.contentTopic, + timestamp: msg.timestamp || undefined, + meta: undefined, + rateLimitProof: undefined, + ephemeral: undefined, + version: undefined + }) + ); + + const messages: IDecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + messageHashes, + pubsubTopic: TestDecoder.pubsubTopic + })) { + for await (const msg of page) { + messages.push(msg as IDecodedMessage); + } + } + expect(messages.length).to.equal(messageHashes.length); + for (const msg of messages) { + expect(msg.contentTopic).to.equal(TestDecoder.contentTopic); + } + }); +}); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 01419bed50..727149f240 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -17,6 +17,7 @@ import { expect } from "chai"; import { Context } from "mocha"; import { delay, NOISE_KEY_1, runNodes, ServiceNode } from "../../src/index.js"; +import { MessageRpcQuery } from "../../src/types.js"; export const log = new Logger("test:store"); @@ -49,20 +50,20 @@ export async function sendMessages( instance: ServiceNode, numMessages: number, contentTopic: string, - pubsubTopic: string -): Promise { + pubsubTopic: string, + timestamp: boolean = false +): Promise { + const messages: MessageRpcQuery[] = new Array(numMessages); for (let i = 0; i < numMessages; i++) { - expect( - await instance.sendMessage( - ServiceNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: contentTopic - }), - pubsubTopic - ) - ).to.eq(true); + messages[i] = ServiceNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: contentTopic, + timestamp: timestamp ? new Date() : undefined + }); + expect(await instance.sendMessage(messages[i], pubsubTopic)).to.eq(true); await delay(1); // to ensure each timestamp is unique. } + return messages; } export async function sendMessagesAutosharding(