Skip to content

Commit 2edb814

Browse files
committed
wip: creating concurrency tests
I have a good basis now for spawing and running servers, clients and streams concurrently with random delays. * Related #14 [ci skip]
1 parent b5436a5 commit 2edb814

File tree

5 files changed

+282
-19
lines changed

5 files changed

+282
-19
lines changed

src/QUICConnection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ class QUICConnection extends EventTarget {
596596
sendInfo.to.port,
597597
sendInfo.to.host,
598598
);
599-
this.logger.info(`SENT ${sendLength} of data`);
599+
this.logger.debug(`SENT ${sendLength} of data`);
600600
} catch (e) {
601601
this.logger.error(`send error ${e.message}`);
602602
this.dispatchEvent(

src/QUICServer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ class QUICServer extends EventTarget {
285285
this.logger.debug(
286286
`Accepting new connection from QUIC packet from ${remoteInfo.host}:${remoteInfo.port}`,
287287
);
288+
const clientConnRef = Buffer.from(header.scid).toString('hex').slice(32);
288289
const connection = await QUICConnection.acceptQUICConnection({
289290
scid,
290291
dcid: dcidOriginal,
@@ -296,7 +297,7 @@ class QUICServer extends EventTarget {
296297
maxReadableStreamBytes: this.maxReadableStreamBytes,
297298
maxWritableStreamBytes: this.maxWritableStreamBytes,
298299
logger: this.logger.getChild(
299-
`${QUICConnection.name} ${scid.toString().slice(32)}`,
300+
`${QUICConnection.name} ${scid.toString().slice(32)}-${clientConnRef}`,
300301
),
301302
});
302303

src/QUICSocket.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,7 @@ class QUICSocket extends EventTarget {
139139
// Each send/recv/timeout may result in a destruction
140140
if (!conn[destroyed]) {
141141
// Ignore any errors, concurrent with destruction
142-
await conn.send().catch((e) => {
143-
this.logger.error(`not destroyed send ${e.message}`);
144-
});
142+
await conn.send().catch(() => {});
145143
}
146144
};
147145

@@ -366,7 +364,7 @@ class QUICSocket extends EventTarget {
366364
// QUICConnection.createQUICConnection
367365
// Then that means, we are really creating that connection in the async creator
368366
// That means the async creator needs to create teh `connection` and call it too
369-
this.logger.error('registerClient IS NOT IMPLEMENTED!');
367+
// this.logger.error('TMP registerClient IS NOT IMPLEMENTED!');
370368
}
371369

372370
// But we already have a connection map

src/QUICStream.ts

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class QUICStream
168168
// with the `fin` set to true
169169
// If this itself results in an error, we can continue
170170
// But continue to do the below
171-
this.logger.debug('sending fin frame');
171+
this.logger.info('sending fin frame');
172172
await this.streamSend(new Uint8Array(0), true).catch((e) => {
173173
// Ignore send error if stream is already closed
174174
if (e.message !== 'send') throw e;
@@ -303,7 +303,6 @@ class QUICStream
303303
this.conn.streamWritable(this.streamId, 0);
304304
return false;
305305
} catch (e) {
306-
this.logger.info(e.message);
307306
// If the writable has ended, we need to close the writable.
308307
// We need to do this in the background to keep this synchronous.
309308
void this.processSendStreamError(e, 'send').then((reason) => {
@@ -325,7 +324,7 @@ class QUICStream
325324
protected async streamRecv(): Promise<void> {
326325
const buf = Buffer.alloc(1024);
327326
let recvLength: number, fin: boolean;
328-
this.logger.info('trying receiving');
327+
this.logger.debug('trying receiving');
329328
try {
330329
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
331330
} catch (e) {
@@ -341,6 +340,7 @@ class QUICStream
341340
return;
342341
} else {
343342
this.logger.debug('Stream reported: error');
343+
this.logger.error(`Stream reported: error ${e.message}`);
344344
// Signal receiving has ended
345345
this.recvFinishedProm.resolveP();
346346
const reason = await this.processSendStreamError(e, 'recv');
@@ -459,11 +459,13 @@ class QUICStream
459459
const code = isError ? await this.reasonToCode('send', reason) : 0;
460460
// This will send a `STOP_SENDING` frame with the code
461461
// When the other peer sends, they will get a `StreamStopped(u64)` exception
462-
try {
463-
this.conn.streamShutdown(this.streamId, quiche.Shutdown.Read, code);
464-
} catch (e) {
465-
// Ignore if already shutdown
466-
if (e.message !== 'Done') throw e;
462+
if (isError) {
463+
try {
464+
this.conn.streamShutdown(this.streamId, quiche.Shutdown.Read, code);
465+
} catch (e) {
466+
// Ignore if already shutdown
467+
if (e.message !== 'Done') throw e;
468+
}
467469
}
468470
await this.connection.send();
469471
if (this[status] !== 'destroying' && this._recvClosed && this._sendClosed) {
@@ -493,11 +495,13 @@ class QUICStream
493495
const code = isError ? await this.reasonToCode('send', reason) : 0;
494496
// This will send a `RESET_STREAM` frame with the code
495497
// When the other peer receives, they will get a `StreamReset(u64)` exception
496-
try {
497-
this.conn.streamShutdown(this.streamId, quiche.Shutdown.Write, code);
498-
} catch (e) {
499-
// Ignore if already shutdown
500-
if (e.message !== 'Done') throw e;
498+
if (isError) {
499+
try {
500+
this.conn.streamShutdown(this.streamId, quiche.Shutdown.Write, code);
501+
} catch (e) {
502+
// Ignore if already shutdown
503+
if (e.message !== 'Done') throw e;
504+
}
501505
}
502506
await this.connection.send();
503507
if (this[status] !== 'destroying' && this._recvClosed && this._sendClosed) {

tests/concurrency.test.ts

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
import type * as events from '@/events';
2+
import type QUICStream from '@/QUICStream';
3+
import type { Crypto, Host, Port } from '@';
4+
import { fc, testProp } from '@fast-check/jest';
5+
import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger';
6+
import QUICServer from '@/QUICServer';
7+
import { promise } from '@/utils';
8+
import QUICClient from '@/QUICClient';
9+
import { tlsConfigWithCaArb } from './tlsUtils';
10+
import { sleep } from './utils';
11+
import * as testsUtils from './utils';
12+
13+
describe('Concurrency tests', () => {
14+
const logger = new Logger(`${QUICClient.name} Test`, LogLevel.INFO, [
15+
new StreamHandler(
16+
formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`,
17+
),
18+
]);
19+
// This has to be setup asynchronously due to key generation
20+
let crypto: {
21+
key: ArrayBuffer;
22+
ops: Crypto;
23+
};
24+
25+
beforeEach(async () => {
26+
crypto = {
27+
key: await testsUtils.generateKey(),
28+
ops: {
29+
sign: testsUtils.sign,
30+
verify: testsUtils.verify,
31+
randomBytes: testsUtils.randomBytes,
32+
},
33+
};
34+
});
35+
36+
/**
37+
* This is used to have a stream run concurrently in the background.
38+
* Will resolve once stream has completed.
39+
* This will send the data provided with delays provided.
40+
* Will consume stream with provided delays between reads.
41+
*/
42+
const handleStreamProm = async (
43+
stream: QUICStream,
44+
streamData: StreamData,
45+
) => {
46+
const messages = streamData.messages;
47+
const delays = streamData.delays;
48+
const writeProm = (async () => {
49+
// Write data
50+
let count = 0;
51+
const writer = stream.writable.getWriter();
52+
for (const message of messages) {
53+
await writer.write(message);
54+
await sleep(delays[count % delays.length]);
55+
count += 1;
56+
}
57+
await sleep(streamData.endDelay);
58+
await writer.close();
59+
})();
60+
const readProm = (async () => {
61+
// Consume readable
62+
let count = 0;
63+
for await (const _ of stream.readable) {
64+
// Do nothing with delay,
65+
await sleep(delays[count % delays.length]);
66+
count += 3;
67+
}
68+
})();
69+
try {
70+
await Promise.all([writeProm, readProm]);
71+
} finally {
72+
await stream.destroy().catch(() => {});
73+
logger.info(
74+
`stream result ${JSON.stringify(
75+
await Promise.allSettled([writeProm, readProm]),
76+
)}`,
77+
);
78+
}
79+
};
80+
81+
const handleClientProm = async (
82+
client: QUICClient,
83+
connectionData: ConnectionData,
84+
) => {
85+
const streamProms: Array<Promise<void>> = [];
86+
try {
87+
for (const streamData of connectionData.streams) {
88+
const streamProm = sleep(streamData.startDelay)
89+
.then(() => client.connection.streamNew())
90+
.then((stream) => handleStreamProm(stream, streamData));
91+
streamProms.push(streamProm);
92+
}
93+
await Promise.all(streamProms);
94+
await sleep(connectionData.endDelay);
95+
} finally {
96+
await client.destroy({ force: true });
97+
logger.info(
98+
`client result ${JSON.stringify(
99+
await Promise.allSettled(streamProms),
100+
)}`,
101+
);
102+
}
103+
};
104+
105+
type Messages = Array<Uint8Array>;
106+
type StreamData = {
107+
messages: Messages;
108+
startDelay: number;
109+
endDelay: number;
110+
delays: Array<number>;
111+
};
112+
type ConnectionData = {
113+
streams: Array<StreamData>;
114+
startDelay: number;
115+
endDelay: number;
116+
};
117+
const messagesArb = fc
118+
.array(fc.uint8Array({ size: 'medium', minLength: 1 }), {
119+
size: 'small',
120+
minLength: 1,
121+
})
122+
.noShrink() as fc.Arbitrary<Messages>;
123+
const streamArb = fc
124+
.record({
125+
messages: messagesArb,
126+
startDelay: fc.integer({ min: 0, max: 50 }),
127+
endDelay: fc.integer({ min: 0, max: 50 }),
128+
delays: fc.array(fc.integer({ min: 0, max: 10 }), { size: 'small' }),
129+
})
130+
.noShrink() as fc.Arbitrary<StreamData>;
131+
const streamsArb = (minLength?: number) =>
132+
fc.array(streamArb, { size: 'small', minLength }).noShrink();
133+
const connectionArb = fc
134+
.record({
135+
streams: streamsArb(),
136+
startDelay: fc.integer({ min: 0, max: 50 }),
137+
endDelay: fc.integer({ min: 0, max: 50 }),
138+
})
139+
.noShrink() as fc.Arbitrary<ConnectionData>;
140+
const connectionsArb = fc
141+
.array(connectionArb, { minLength: 1, size: 'small' })
142+
.noShrink() as fc.Arbitrary<Array<ConnectionData>>;
143+
144+
testProp(
145+
'Multiple clients connecting to a server',
146+
[tlsConfigWithCaArb, connectionsArb, streamsArb(3)],
147+
async (tlsConfigProm, clientDatas, serverStreams) => {
148+
const tlsConfig = await tlsConfigProm;
149+
const cleanUpHoldProm = promise<void>();
150+
const serverProm = (async () => {
151+
const server = new QUICServer({
152+
crypto,
153+
logger: logger.getChild(QUICServer.name),
154+
config: {
155+
tlsConfig: tlsConfig.tlsConfig,
156+
verifyPeer: false,
157+
},
158+
});
159+
const connProms: Array<Promise<void>> = [];
160+
server.addEventListener(
161+
'connection',
162+
async (e: events.QUICServerConnectionEvent) => {
163+
const conn = e.detail;
164+
const connProm = (async () => {
165+
const serverStreamProms: Array<Promise<void>> = [];
166+
conn.addEventListener(
167+
'stream',
168+
(streamEvent: events.QUICConnectionStreamEvent) => {
169+
const stream = streamEvent.detail;
170+
const streamData =
171+
serverStreams[
172+
serverStreamProms.length % serverStreams.length
173+
];
174+
serverStreamProms.push(handleStreamProm(stream, streamData));
175+
},
176+
);
177+
try {
178+
await cleanUpHoldProm.p;
179+
await Promise.all(serverStreamProms);
180+
} finally {
181+
await conn.destroy({ force: true });
182+
logger.info(
183+
`server conn result ${JSON.stringify(
184+
await Promise.allSettled(serverStreamProms),
185+
)}`,
186+
);
187+
}
188+
})();
189+
connProms.push(connProm);
190+
},
191+
);
192+
await sleep(100);
193+
await server.start({
194+
host: '127.0.0.1' as Host,
195+
port: 55555 as Port,
196+
});
197+
try {
198+
await cleanUpHoldProm.p;
199+
await Promise.all(connProms);
200+
} finally {
201+
await server.stop({ force: true });
202+
logger.info(
203+
`server result ${JSON.stringify(
204+
await Promise.allSettled(connProms),
205+
)}`,
206+
);
207+
}
208+
})();
209+
210+
// Creating client activity
211+
logger.info('STARTING CLIENTS');
212+
const clientProms: Array<Promise<void>> = [];
213+
for (const clientData of clientDatas) {
214+
const clientProm = sleep(clientData.startDelay)
215+
.then(() => {
216+
logger.info('STARTING CLIENT');
217+
return QUICClient.createQUICClient({
218+
host: '::ffff:127.0.0.1' as Host,
219+
port: 55555 as Port,
220+
localHost: '::' as Host,
221+
crypto,
222+
logger: logger.getChild(QUICClient.name),
223+
config: {
224+
verifyPeer: false,
225+
},
226+
});
227+
})
228+
.then((client) => {
229+
return handleClientProm(client, clientData);
230+
});
231+
clientProms.push(clientProm);
232+
}
233+
// Wait for running activity to finish, should complete without error
234+
logger.info('STARTING TEST');
235+
try {
236+
await (async () => {
237+
await Promise.all(clientProms);
238+
// Allow for streams to be negotiated
239+
await sleep(200);
240+
cleanUpHoldProm.resolveP();
241+
await serverProm;
242+
})();
243+
} catch (e) {
244+
logger.error(`test failed with ${e.message}`);
245+
console.error(e);
246+
throw e;
247+
} finally {
248+
logger.info('STARTING TEST FINALLY');
249+
cleanUpHoldProm.resolveP();
250+
logger.info(
251+
`test result ${JSON.stringify(
252+
await Promise.allSettled([...clientProms, serverProm]),
253+
)}`,
254+
);
255+
}
256+
logger.info('TEST FULLY DONE!');
257+
},
258+
{ numRuns: 3, timeout: 50000 },
259+
);
260+
});

0 commit comments

Comments
 (0)