Skip to content

Commit db3e0b5

Browse files
committed
fix: simplify callback listener and fix async bug
1 parent 21383ee commit db3e0b5

File tree

2 files changed

+115
-26
lines changed

2 files changed

+115
-26
lines changed

src/puter-wisp/src/exports.js

+2-11
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,9 @@ class ATStream {
7777
}
7878

7979
const NewCallbackByteStream = () => {
80-
let listener;
8180
let queue = [];
8281
const NOOP = () => {};
8382
let signal = NOOP;
84-
(async () => {
85-
for (;;) {
86-
const v = await new Promise((rslv, rjct) => {
87-
listener = rslv;
88-
});
89-
queue.push(v);
90-
signal();
91-
}
92-
})();
9383
const stream = {
9484
[Symbol.asyncIterator](){
9585
return this;
@@ -110,7 +100,8 @@ const NewCallbackByteStream = () => {
110100
}
111101
};
112102
stream.listener = data => {
113-
listener(data);
103+
queue.push(data);
104+
signal();
114105
};
115106
return stream;
116107
}

src/puter-wisp/test/test.js

+113-15
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,130 @@ const NewTestFullByteStream = uint8array => {
1919
})();
2020
};
2121

22-
(async () => {
22+
/**
23+
* This will send 'sz'-sized chunks of the uint8array
24+
* until the uint8array is exhausted. The last chunk
25+
* may be smaller than 'sz'.
26+
* @curry
27+
* @param {*} sz
28+
* @param {*} uint8array
29+
*/
30+
const NewTestWindowByteStream = sz => {
31+
const fn = uint8array => {
32+
return (async function * () {
33+
let offset = 0;
34+
while ( offset < uint8array.length ) {
35+
const end = Math.min(offset + sz, uint8array.length);
36+
const chunk = uint8array.slice(offset, end);
37+
offset += sz;
38+
yield chunk;
39+
}
40+
})();
41+
};
42+
fn.name_ = `NewTestWindowByteStream(${sz})`;
43+
return fn;
44+
};
45+
46+
const NewTestChunkedByteStream = chunks => {
47+
return (async function * () {
48+
for ( const chunk of chunks ) {
49+
yield chunk;
50+
}
51+
})();
52+
}
53+
54+
const test = async (name, fn) => {
55+
console.log(`\x1B[36;1m=== [ Running test: ${name} ] ===\x1B[0m`);
56+
await fn();
57+
};
58+
59+
const BASH_TEST_BYTES = [
60+
22, 0, 0, 0, 2, 1, 0, 0, 0, 27, 91, 63, 50, 48, 48, 52, 108, 13, 27, 91, 63, 50, 48, 48, 52, 104,
61+
10, 0, 0, 0, 2, 1, 0, 0, 0, 40, 110, 111, 110, 101,
62+
10, 0, 0, 0, 2, 1, 0, 0, 0, 41, 58, 47, 35, 32,
63+
7, 0, 0, 0, 2, 1, 0, 0, 0, 13, 10,
64+
14, 0, 0, 0, 2, 1, 0, 0, 0, 27, 91, 63, 50, 48, 48, 52, 108, 13,
65+
17, 0, 0, 0, 2, 1, 0, 0, 0, 27, 91, 63, 50, 48, 48, 52, 104, 40, 110, 111, 110,
66+
11, 0, 0, 0, 2, 1, 0, 0, 0, 101, 41, 58, 47, 35, 32
67+
]
68+
69+
const runit = async () => {
2370
const stream_behaviors = [
2471
NewTestByteStream,
2572
NewTestFullByteStream,
73+
NewTestWindowByteStream(2),
74+
NewTestWindowByteStream(3),
2675
];
76+
2777
for ( const stream_behavior of stream_behaviors ) {
28-
const byteStream = stream_behavior(
78+
await test(`Wisp CONTINUE ${stream_behavior.name_ ?? stream_behavior.name}`, async () => {
79+
const byteStream = stream_behavior(
80+
Uint8Array.from([
81+
9, 0, 0, 0, // size of frame: 9 bytes (u32-L)
82+
3, // CONTINUE (u8)
83+
0, 0, 0, 0, // stream id: 0 (u32-L)
84+
0x0F, 0x0F, 0, 0, // buffer size (u32-L)
85+
])
86+
);
87+
const virtioStream = NewVirtioFrameStream(byteStream);
88+
const wispStream = NewWispPacketStream(virtioStream);
89+
90+
const packets = [];
91+
for await ( const packet of wispStream ) {
92+
packets.push(packet);
93+
}
94+
95+
assert.strictEqual(packets.length, 1);
96+
const packet = packets[0];
97+
assert.strictEqual(packet.type.id, 3);
98+
assert.strictEqual(packet.type.label, 'CONTINUE');
99+
assert.strictEqual(packet.type, WispPacket.CONTINUE);
100+
});
101+
}
102+
103+
await test('bash prompt chunking', async () => {
104+
const byteStream = NewTestChunkedByteStream([
105+
// These are data frames from virtio->twisp->bash
106+
// "(none"
107+
Uint8Array.from([
108+
10, 0, 0, 0, 2, 1, 0, 0, 0,
109+
40, 110, 111, 110, 101
110+
]),
111+
// "):/# "
29112
Uint8Array.from([
30-
9, 0, 0, 0, // size of frame: 9 bytes (u32-L)
31-
3, // CONTINUE (u8)
32-
0, 0, 0, 0, // stream id: 0 (u32-L)
33-
0x0F, 0x0F, 0, 0, // buffer size (u32-L)
34-
])
35-
);
113+
10, 0, 0, 0, 2, 1, 0, 0, 0,
114+
41, 58, 47, 35, 32,
115+
]),
116+
]);
36117
const virtioStream = NewVirtioFrameStream(byteStream);
37118
const wispStream = NewWispPacketStream(virtioStream);
38119

39-
const packets = [];
120+
const data = [];
40121
for await ( const packet of wispStream ) {
41-
packets.push(packet);
122+
for ( const item of packet.payload ) {
123+
data.push(item);
124+
}
42125
}
43126

44-
assert.strictEqual(packets.length, 1);
45-
const packet = packets[0];
46-
assert.strictEqual(packet.type.id, 3);
47-
assert.strictEqual(packet.type.label, 'CONTINUE');
48-
assert.strictEqual(packet.type, WispPacket.CONTINUE);
127+
const expected = [
128+
40, 110, 111, 110, 101,
129+
41, 58, 47, 35, 32,
130+
];
131+
132+
assert.strictEqual(data.length, expected.length);
133+
for ( let i = 0; i < data.length; i++ ) {
134+
assert.strictEqual(data[i], expected[i]);
135+
}
136+
});
137+
};
138+
139+
(async () => {
140+
try {
141+
await runit();
142+
} catch (e) {
143+
console.error(e);
144+
console.log(`\x1B[31;1mTest Failed\x1B[0m`);
145+
process.exit(1);
49146
}
147+
console.log(`\x1B[32;1mAll tests passed\x1B[0m`);
50148
})();

0 commit comments

Comments
 (0)