Skip to content

Commit ca0ec97

Browse files
committed
feat: update light push to match v3 spec
1 parent 49f26d8 commit ca0ec97

27 files changed

+1966
-75
lines changed

packages/core/src/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ export { FilterCore, FilterCodecs } from "./lib/filter/index.js";
1111

1212
export * as waku_light_push from "./lib/light_push/index.js";
1313
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
14+
export {
15+
LightPushCoreV2,
16+
LightPushCodecV2,
17+
LightPushCoreV3,
18+
lightPushStatusCodeToProtocolErrorV3,
19+
isSuccessStatusCodeV3
20+
} from "./lib/light_push/index.js";
1421

1522
export * as waku_store from "./lib/store/index.js";
1623
export { StoreCore, StoreCodec } from "./lib/store/index.js";

packages/core/src/lib/connection_manager/connection_manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ export class ConnectionManager
267267
* // Dial using multiaddr with specific protocols
268268
* await connectionManager.dialPeer(multiaddr, [
269269
* "/vac/waku/relay/2.0.0",
270-
* "/vac/waku/lightpush/2.0.0-beta1"
270+
* "/vac/waku/lightpush/3.0.0"
271271
* ]);
272272
* ```
273273
*
Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,23 @@
1-
export { LightPushCore, LightPushCodec, PushResponse } from "./light_push.js";
1+
export {
2+
LightPushCore,
3+
LightPushCodec,
4+
LightPushCoreV2,
5+
LightPushCodecV2,
6+
PushResponse
7+
} from "./light_push.js";
8+
9+
export { LightPushCoreV3 } from "./light_push_v3.js";
10+
export { PushRpcV3 } from "./push_rpc_v3.js";
11+
export {
12+
lightPushStatusCodeToProtocolErrorV3,
13+
lightPushStatusDescriptionsV3,
14+
getLightPushStatusDescriptionV3,
15+
isSuccessStatusCodeV3
16+
} from "./status_codes_v3.js";
17+
export {
18+
LightPushStatusCode,
19+
lightPushStatusCodeToProtocolError,
20+
lightPushStatusDescriptions,
21+
getLightPushStatusDescription,
22+
isSuccessStatusCode
23+
} from "./status_codes.js";

packages/core/src/lib/light_push/light_push.ts

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
PubsubTopic,
1010
type ThisOrThat
1111
} from "@waku/interfaces";
12-
import { PushResponse } from "@waku/proto";
12+
import { proto_lightpush_v2, PushResponse } from "@waku/proto";
1313
import { isMessageSizeUnderCap } from "@waku/utils";
1414
import { Logger } from "@waku/utils";
1515
import all from "it-all";
@@ -19,19 +19,17 @@ import { Uint8ArrayList } from "uint8arraylist";
1919

2020
import { BaseProtocol } from "../base_protocol.js";
2121

22-
import { PushRpc } from "./push_rpc.js";
22+
import { PushRpcV2 } from "./push_rpc_v2.js";
2323
import { isRLNResponseError } from "./utils.js";
2424

2525
const log = new Logger("light-push");
2626

2727
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
28-
export { PushResponse };
2928

30-
type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
29+
export const LightPushCodecV2 = LightPushCodec;
30+
31+
type PreparePushMessageResult = ThisOrThat<"query", PushRpcV2>;
3132

32-
/**
33-
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
34-
*/
3533
export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
3634
public constructor(
3735
public readonly pubsubTopics: PubsubTopic[],
@@ -64,7 +62,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
6462
};
6563
}
6664

67-
const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic);
65+
const query = PushRpcV2.createRequest(protoMessage, encoder.pubsubTopic);
6866
return { query, error: null };
6967
} catch (error) {
7068
log.error("Failed to prepare push message", error);
@@ -120,7 +118,6 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
120118
async (source) => await all(source)
121119
);
122120
} catch (err) {
123-
// can fail only because of `stream` abortion
124121
log.error("Failed to send waku light push request", err);
125122
return {
126123
success: null,
@@ -136,9 +133,9 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
136133
bytes.append(chunk);
137134
});
138135

139-
let response: PushResponse | undefined;
136+
let response: proto_lightpush_v2.PushResponse | undefined;
140137
try {
141-
response = PushRpc.decode(bytes).response;
138+
response = PushRpcV2.decode(bytes).response;
142139
} catch (err) {
143140
log.error("Failed to decode push reply", err);
144141
return {
@@ -161,19 +158,21 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
161158
};
162159
}
163160

164-
if (isRLNResponseError(response.info)) {
165-
log.error("Remote peer fault: RLN generation");
166-
return {
167-
success: null,
168-
failure: {
169-
error: ProtocolError.RLN_PROOF_GENERATION,
170-
peerId: peerId
171-
}
172-
};
173-
}
174-
175161
if (!response.isSuccess) {
176-
log.error("Remote peer rejected the message: ", response.info);
162+
const errorMessage = response.info || "Message rejected";
163+
log.error("Remote peer rejected the message: ", errorMessage);
164+
165+
if (response.info && isRLNResponseError(response.info)) {
166+
log.error("Remote peer fault: RLN generation");
167+
return {
168+
success: null,
169+
failure: {
170+
error: ProtocolError.RLN_PROOF_GENERATION,
171+
peerId: peerId
172+
}
173+
};
174+
}
175+
177176
return {
178177
success: null,
179178
failure: {
@@ -186,3 +185,6 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
186185
return { success: peerId, failure: null };
187186
}
188187
}
188+
189+
export const LightPushCoreV2 = LightPushCore;
190+
export { PushResponse };
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
import type { PeerId, Stream } from "@libp2p/interface";
2+
import {
3+
type CoreProtocolResult,
4+
type IBaseProtocolCore,
5+
type IEncoder,
6+
type IMessage,
7+
type Libp2p,
8+
LightPushCodecV3,
9+
LightPushStatusCodeV3,
10+
ProtocolError,
11+
PubsubTopic,
12+
type ThisOrThat
13+
} from "@waku/interfaces";
14+
import { proto_lightpush_v3, WakuMessage } from "@waku/proto";
15+
import { isMessageSizeUnderCap } from "@waku/utils";
16+
import { Logger } from "@waku/utils";
17+
import all from "it-all";
18+
import * as lp from "it-length-prefixed";
19+
import { pipe } from "it-pipe";
20+
import { Uint8ArrayList } from "uint8arraylist";
21+
22+
import { BaseProtocol } from "../base_protocol.js";
23+
24+
import { PushRpcV3 } from "./push_rpc_v3.js";
25+
import {
26+
getLightPushStatusDescriptionV3,
27+
isSuccessStatusCodeV3,
28+
lightPushStatusCodeToProtocolErrorV3
29+
} from "./status_codes_v3.js";
30+
import { isRLNResponseError } from "./utils.js";
31+
32+
const log = new Logger("light-push-v3");
33+
34+
type PreparePushMessageResult = ThisOrThat<"query", PushRpcV3>;
35+
36+
export class LightPushCoreV3 extends BaseProtocol implements IBaseProtocolCore {
37+
public constructor(
38+
public readonly pubsubTopics: PubsubTopic[],
39+
libp2p: Libp2p
40+
) {
41+
super(LightPushCodecV3, libp2p.components, pubsubTopics);
42+
}
43+
44+
private async preparePushMessage(
45+
encoder: IEncoder,
46+
message: IMessage
47+
): Promise<PreparePushMessageResult> {
48+
try {
49+
if (!message.payload || message.payload.length === 0) {
50+
log.error("Failed to send waku light push: payload is empty");
51+
return { query: null, error: ProtocolError.EMPTY_PAYLOAD };
52+
}
53+
54+
if (!(await isMessageSizeUnderCap(encoder, message))) {
55+
log.error("Failed to send waku light push: message is bigger than 1MB");
56+
return { query: null, error: ProtocolError.SIZE_TOO_BIG };
57+
}
58+
59+
const protoMessage = await encoder.toProtoObj(message);
60+
if (!protoMessage) {
61+
log.error("Failed to encode to protoMessage, aborting push");
62+
return {
63+
query: null,
64+
error: ProtocolError.ENCODE_FAILED
65+
};
66+
}
67+
68+
const query = PushRpcV3.createRequest(
69+
protoMessage as WakuMessage,
70+
encoder.pubsubTopic
71+
);
72+
return { query, error: null };
73+
} catch (error) {
74+
log.error("Failed to prepare push message", error);
75+
76+
return {
77+
query: null,
78+
error: ProtocolError.GENERIC_FAIL
79+
};
80+
}
81+
}
82+
83+
public async send(
84+
encoder: IEncoder,
85+
message: IMessage,
86+
peerId: PeerId
87+
): Promise<CoreProtocolResult> {
88+
const { query, error: preparationError } = await this.preparePushMessage(
89+
encoder,
90+
message
91+
);
92+
93+
if (preparationError || !query) {
94+
return {
95+
success: null,
96+
failure: {
97+
error: preparationError,
98+
peerId
99+
}
100+
};
101+
}
102+
103+
let stream: Stream;
104+
try {
105+
stream = await this.getStream(peerId);
106+
} catch (error) {
107+
log.error("Failed to get stream", error);
108+
return {
109+
success: null,
110+
failure: {
111+
error: ProtocolError.NO_STREAM_AVAILABLE,
112+
peerId: peerId
113+
}
114+
};
115+
}
116+
117+
let res: Uint8ArrayList[] | undefined;
118+
try {
119+
res = await pipe(
120+
[query.encode()],
121+
lp.encode,
122+
stream,
123+
lp.decode,
124+
async (source) => await all(source)
125+
);
126+
} catch (err) {
127+
log.error("Failed to send waku light push request", err);
128+
return {
129+
success: null,
130+
failure: {
131+
error: ProtocolError.STREAM_ABORTED,
132+
peerId: peerId
133+
}
134+
};
135+
}
136+
137+
const bytes = new Uint8ArrayList();
138+
res.forEach((chunk) => {
139+
bytes.append(chunk);
140+
});
141+
142+
let response: proto_lightpush_v3.LightpushResponse | undefined;
143+
try {
144+
response = proto_lightpush_v3.LightpushResponse.decode(bytes);
145+
} catch (err) {
146+
log.error("Failed to decode push response", err);
147+
return {
148+
success: null,
149+
failure: {
150+
error: ProtocolError.DECODE_FAILED,
151+
peerId: peerId
152+
}
153+
};
154+
}
155+
156+
if (!response) {
157+
log.error("Remote peer fault: No response received");
158+
return {
159+
success: null,
160+
failure: {
161+
error: ProtocolError.NO_RESPONSE,
162+
peerId: peerId
163+
}
164+
};
165+
}
166+
167+
// Validate request ID matches (except for rate limiting responses)
168+
if (response.requestId !== query.query?.requestId) {
169+
// nwaku sends "N/A" for rate limiting responses
170+
if (response.statusCode !== LightPushStatusCodeV3.TOO_MANY_REQUESTS) {
171+
log.error("Request ID mismatch", {
172+
sent: query.query?.requestId,
173+
received: response.requestId
174+
});
175+
return {
176+
success: null,
177+
failure: {
178+
error: ProtocolError.GENERIC_FAIL,
179+
peerId: peerId
180+
}
181+
};
182+
}
183+
}
184+
185+
const statusCode = response.statusCode;
186+
const isSuccess = isSuccessStatusCodeV3(statusCode);
187+
188+
// Special handling for nwaku rate limiting
189+
if (statusCode === LightPushStatusCodeV3.TOO_MANY_REQUESTS) {
190+
if (response.requestId === "N/A") {
191+
log.warn("Rate limited by nwaku node", {
192+
statusDesc:
193+
response.statusDesc || "Request rejected due to too many requests"
194+
});
195+
}
196+
}
197+
198+
if (response.relayPeerCount !== undefined) {
199+
log.info(`Message relayed to ${response.relayPeerCount} peers`);
200+
}
201+
202+
if (response.statusDesc && isRLNResponseError(response.statusDesc)) {
203+
log.error("Remote peer fault: RLN generation");
204+
return {
205+
success: null,
206+
failure: {
207+
error: ProtocolError.RLN_PROOF_GENERATION,
208+
peerId: peerId
209+
}
210+
};
211+
}
212+
213+
if (!isSuccess) {
214+
const errorMessage = getLightPushStatusDescriptionV3(
215+
statusCode,
216+
response.statusDesc
217+
);
218+
log.error("Remote peer rejected the message: ", errorMessage);
219+
220+
const protocolError = lightPushStatusCodeToProtocolErrorV3(statusCode);
221+
return {
222+
success: null,
223+
failure: {
224+
error: protocolError,
225+
peerId: peerId
226+
}
227+
};
228+
}
229+
230+
return { success: peerId, failure: null };
231+
}
232+
}

packages/core/src/lib/light_push/push_rpc.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export class PushRpc {
77

88
public static createRequest(
99
message: proto.WakuMessage,
10-
pubsubTopic: string
10+
pubsubTopic?: string
1111
): PushRpc {
1212
return new PushRpc({
1313
requestId: uuid(),

0 commit comments

Comments
 (0)