Skip to content

Commit 230008d

Browse files
committed
fix: fixed the randon non-cleanup bug
using a stop-gap measure where we add and remove data to the finish frame message. Likely there is a bug in the quiche code where this packet is not being sent due to it being a 0-length message. It was fixed before in their code, but they may have missed an edge case where a large volume of data is being processed. * Related #14 [ci skip]
1 parent 7881c96 commit 230008d

File tree

5 files changed

+84
-87
lines changed

5 files changed

+84
-87
lines changed

src/QUICConnection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,9 @@ class QUICConnection extends EventTarget {
482482
quicStream.read();
483483
quicStream.dispatchEvent(new events.QUICStreamReadableEvent());
484484
}
485-
if (readIds.length > 0) this.logger.info(`processed reads for ${readIds}`);
485+
if (readIds.length > 0) {
486+
this.logger.info(`processed reads for ${readIds}`);
487+
}
486488
const writeIds: Array<number> = [];
487489
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
488490
let quicStream = this.streamMap.get(streamId);
@@ -504,7 +506,9 @@ class QUICConnection extends EventTarget {
504506
writeIds.push(quicStream.streamId);
505507
quicStream.write();
506508
}
507-
if (writeIds.length > 0) this.logger.info(`processed writes for ${writeIds}`);
509+
if (writeIds.length > 0) {
510+
this.logger.info(`processed writes for ${writeIds}`);
511+
}
508512
}
509513
} finally {
510514
this.garbageCollectStreams('recv');
@@ -800,7 +804,9 @@ class QUICConnection extends EventTarget {
800804
nums.push(streamId);
801805
quicStream.read();
802806
}
803-
if (nums.length > 0) this.logger.info(`checking read finally ${where} for ${nums}`);
807+
if (nums.length > 0) {
808+
this.logger.info(`checking read finally ${where} for ${nums}`);
809+
}
804810
}
805811
}
806812

src/QUICStream.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,10 @@ class QUICStream
175175
// But continue to do the below
176176
this.logger.info('sending fin frame');
177177
// This.sendFinishedProm.resolveP();
178-
await this.streamSend(Buffer.from("!END!"), true).catch((e) => {
178+
await this.streamSend(Buffer.from([0]), true).catch((e) => {
179179
// Ignore send error if stream is already closed
180180
if (e.message !== 'send') throw e;
181181
});
182-
// await this.streamSend(new Uint8Array(0), true).catch((e) => {
183-
// // Ignore send error if stream is already closed
184-
// if (e.message !== 'send') throw e;
185-
// });
186182
await this.closeSend();
187183
void this.connection.send().catch(() => {});
188184
},
@@ -401,16 +397,17 @@ class QUICStream
401397
}
402398
this.recvBytes += recvLength;
403399
this.logger.info(`recv bytes ${this.recvBytes}`);
404-
try {
405-
if (recvLength > 0 && !this._recvClosed) {
400+
// If fin is true, then that means, the stream is CLOSED
401+
if (!fin) {
402+
// Send the data normally
403+
if (!this._recvClosed) {
406404
this.readableController.enqueue(buf.subarray(0, recvLength));
407405
}
408-
} catch (e) {
409-
console.error(e);
410-
throw e;
411-
}
412-
// If fin is true, then that means, the stream is CLOSED
413-
if (fin) {
406+
} else {
407+
// Strip the end message, removing the `!END!` bytes
408+
if (!this._recvClosed && recvLength > 1) {
409+
this.readableController.enqueue(buf.subarray(0, recvLength - 1));
410+
}
414411
// This will render `stream.cancel` a noop
415412
if (!this._recvClosed) this.readableController.close();
416413
await this.closeRecv();

tests/QUICStream.test.ts

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
import type * as events from '@/events';
22
import type { Crypto, Host, Port } from '@';
33
import type QUICSocket from '@/QUICSocket';
4+
import dgram from 'dgram';
45
import { testProp, fc } from '@fast-check/jest';
56
import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger';
67
import { destroyed } from '@matrixai/async-init';
78
import * as utils from '@/utils';
89
import QUICServer from '@/QUICServer';
910
import QUICClient from '@/QUICClient';
1011
import QUICStream from '@/QUICStream';
12+
import { promise } from '@/utils';
1113
import { tlsConfigWithCaArb, tlsConfigWithCaGENOKPArb } from './tlsUtils';
1214
import * as testsUtils from './utils';
1315
import { handleStreamProm, sleep } from './utils';
14-
import dgram from 'dgram';
15-
import { promise } from '@/utils';
1616

1717
describe(QUICStream.name, () => {
1818
const logger = new Logger(`${QUICStream.name} Test`, LogLevel.WARN, [
@@ -453,8 +453,8 @@ describe(QUICStream.name, () => {
453453
const streamProm = stream.readable
454454
.pipeTo(stream.writable)
455455
.catch((e) => {
456-
expect(e).toBe(testReason);
457-
});
456+
expect(e).toBe(testReason);
457+
});
458458
activeServerStreams.push(streamProm);
459459
serverStreamNum += 1;
460460
if (serverStreamNum >= streamsNum) serverStreamsProm.resolveP();
@@ -464,23 +464,24 @@ describe(QUICStream.name, () => {
464464
const activeClientStreams: Array<Promise<void>> = [];
465465
const message = Buffer.from('Hello!');
466466
const serverStreamsDoneProm = utils.promise();
467-
for (let i = 0; i < streamsNum; i++) { const clientProm = (async () => {
468-
const stream = await client.connection.streamNew();
469-
const writer = stream.writable.getWriter();
470-
// Do write and read messages here.
471-
await writer.write(message);
472-
await stream.readable.cancel(testReason);
473-
await serverStreamsDoneProm.p;
474-
// Need time for packets to send/recv
475-
await testsUtils.sleep(100);
476-
const writeProm = writer.write(message);
477-
await writeProm.then(
478-
() => {
479-
throw Error('write did not throw');
480-
},
481-
(e) => expect(e).toBe(testReason),
482-
);
483-
})();
467+
for (let i = 0; i < streamsNum; i++) {
468+
const clientProm = (async () => {
469+
const stream = await client.connection.streamNew();
470+
const writer = stream.writable.getWriter();
471+
// Do write and read messages here.
472+
await writer.write(message);
473+
await stream.readable.cancel(testReason);
474+
await serverStreamsDoneProm.p;
475+
// Need time for packets to send/recv
476+
await testsUtils.sleep(100);
477+
const writeProm = writer.write(message);
478+
await writeProm.then(
479+
() => {
480+
throw Error('write did not throw');
481+
},
482+
(e) => expect(e).toBe(testReason),
483+
);
484+
})();
484485
// ClientProm.catch(e => logger.error(e));
485486
activeClientStreams.push(clientProm);
486487
}
@@ -767,7 +768,8 @@ describe(QUICStream.name, () => {
767768
.noShrink(),
768769
],
769770
async (tlsConfigProm, streamsData) => {
770-
const connectionEventProm = utils.promise<events.QUICServerConnectionEvent>();
771+
const connectionEventProm =
772+
utils.promise<events.QUICServerConnectionEvent>();
771773
const tlsConfig = await tlsConfigProm;
772774
let server: QUICServer | null = null;
773775
let client: QUICClient | null = null;

tests/concurrency.test.ts

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ describe('Concurrency tests', () => {
3131
// Tracking resources
3232
let sockets: Array<QUICSocket>;
3333
let data: {
34-
// clientDatas1: Array<ConnectionData>;
34+
// ClientDatas1: Array<ConnectionData>;
3535
clientDatas2: Array<ConnectionData>;
3636
};
3737
let err;
@@ -148,7 +148,7 @@ describe('Concurrency tests', () => {
148148
.then((stream) => {
149149
// @ts-ignore: kidnap logger
150150
const logger = stream.logger;
151-
return handleStreamProm(stream, streamData, logger)
151+
return handleStreamProm(stream, streamData, logger);
152152
});
153153
streamProms.push(streamProm);
154154
}
@@ -170,34 +170,42 @@ describe('Concurrency tests', () => {
170170
endDelay: number;
171171
};
172172
const genMessages = () => {
173-
const messages: Array<Buffer> = []
174-
for (let index = 65; index < 65+20; index++) {
173+
const messages: Array<Buffer> = [];
174+
for (let index = 65; index < 65 + 20; index++) {
175175
messages.push(Buffer.from([index, index, index, index, index]));
176176
}
177177
return messages;
178-
}
179-
const messagesArb = fc.constant(genMessages()) as fc.Arbitrary<Messages>;
178+
};
179+
// Const messagesArb = fc.constant(genMessages()) as fc.Arbitrary<Messages>;
180+
const messagesArb = fc
181+
.array(fc.uint8Array({ size: 'medium', minLength: 1 }), {
182+
size: 'small',
183+
minLength: 1,
184+
})
185+
.noShrink() as fc.Arbitrary<Messages>;
180186
const streamArb = fc
181187
.record({
182188
messages: messagesArb,
183-
startDelay: fc.integer({ min: 0, max: 0 }),
184-
endDelay: fc.integer({ min: 0, max: 0 }),
185-
delays: fc.array(fc.integer({ min: 0, max: 0 }), { size: 'small', minLength: 1, maxLength: 1 }),
189+
startDelay: fc.integer({ min: 0, max: 100 }),
190+
endDelay: fc.integer({ min: 0, max: 100 }),
191+
delays: fc.array(fc.integer({ min: 0, max: 50 }), {
192+
size: 'small',
193+
minLength: 1,
194+
}),
186195
})
187196
.noShrink() as fc.Arbitrary<StreamData>;
188197
const streamsArb = (minLength?: number, maxLength?: number) =>
189198
fc.array(streamArb, { size: 'small', minLength, maxLength }).noShrink();
190199
const connectionArb = fc
191200
.record({
192-
streams: streamsArb(60, 60),
193-
startDelay: fc.integer({ min: 0, max: 0 }),
194-
endDelay: fc.integer({ min: 0, max: 0 }),
201+
streams: streamsArb(1),
202+
startDelay: fc.integer({ min: 0, max: 100 }),
203+
endDelay: fc.integer({ min: 0, max: 100 }),
195204
})
196205
.noShrink() as fc.Arbitrary<ConnectionData>;
197206
const connectionsArb = fc
198207
.array(connectionArb, {
199208
minLength: 1,
200-
maxLength: 1,
201209
size: 'small',
202210
})
203211
.noShrink() as fc.Arbitrary<Array<ConnectionData>>;
@@ -486,7 +494,8 @@ describe('Concurrency tests', () => {
486494
...streamData,
487495
endDelay: 0,
488496
},
489-
logger),
497+
logger,
498+
),
490499
);
491500
},
492501
);
@@ -520,29 +529,18 @@ describe('Concurrency tests', () => {
520529
);
521530
}
522531
};
523-
jest.setTimeout(40000);
524532
testProp.only(
525533
'Multiple clients sharing a socket with a server',
526-
[
527-
connectionsArb,
528-
connectionsArb,
529-
streamsArb(3),
530-
streamsArb(3),
531-
],
532-
async (
533-
clientDatas1,
534-
clientDatas2,
535-
serverStreams1,
536-
serverStreams2,
537-
) => {
534+
[connectionsArb, connectionsArb, streamsArb(3), streamsArb(3)],
535+
async (clientDatas1, clientDatas2, serverStreams1, serverStreams2) => {
538536
const clientsInfosA = clientDatas1.map((v) => v.streams.length);
539537
const clientsInfosB = clientDatas2.map((v) => v.streams.length);
540538
logger.info(`clientsA: ${clientsInfosA}`);
541539
logger.info(`clientsB: ${clientsInfosB}`);
542540
data = {
543-
// clientDatas1,
541+
// ClientDatas1,
544542
clientDatas2,
545-
}
543+
};
546544
const cleanUpHoldProm = promise<void>();
547545
// Creating socket
548546
const socket1 = new QUICSocket({
@@ -588,7 +586,7 @@ describe('Concurrency tests', () => {
588586

589587
// Creating client activity
590588
logger.info('STARTING CLIENTS');
591-
// const clientProms1: Array<Promise<void>> = [];
589+
// Const clientProms1: Array<Promise<void>> = [];
592590
const clientProms2: Array<Promise<void>> = [];
593591
// For (const clientData of clientDatas1) {
594592
// const clientProm = sleep(clientData.startDelay)
@@ -647,7 +645,6 @@ describe('Concurrency tests', () => {
647645
await serverProm1;
648646
// Await serverProm2;
649647
console.log('DONE waiting for server proms');
650-
651648
})();
652649
} catch (e) {
653650
err = e;

tests/utils.ts

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -108,35 +108,22 @@ const handleStreamProm = async (
108108
const writeProm = (async () => {
109109
// Write data
110110
let count = 0;
111-
const messageBytes: Array<number> = [];
112111
const writer = stream.writable.getWriter();
113112
for (const message of messages) {
114113
await writer.write(message);
115-
messageBytes.push(message.length);
116114
await sleep(delays[count % delays.length]);
117115
count += 1;
118116
}
119-
console.log('sent messages ', messageBytes, messageBytes.reduce((a, b) => a+b, 0));
120117
await sleep(streamData.endDelay);
121118
await writer.close();
122119
})();
123120
const readProm = (async () => {
124121
// Consume readable
125122
let count = 0;
126-
let bytes = Buffer.alloc(0);
127-
let err: any;
128-
try {
129-
for await (const message of stream.readable) {
130-
bytes = Buffer.concat([bytes, message]);
131-
// Do nothing with delay,
132-
await sleep(delays[count % delays.length]);
133-
count += 1;
134-
}
135-
} catch (e) {
136-
err = e;
137-
throw e
138-
} finally {
139-
logger.info(`received messages ${bytes.length}, ${bytes.toString()}, err? ${err != null} ${err?.message}`);
123+
for await (const _ of stream.readable) {
124+
// Do nothing with delay,
125+
await sleep(delays[count % delays.length]);
126+
count += 1;
140127
}
141128
})();
142129
try {
@@ -154,4 +141,12 @@ const handleStreamProm = async (
154141
};
155142

156143
export type { Messages, StreamData };
157-
export { sleep, generateKey, sign, verify, randomBytes, extractSocket, handleStreamProm };
144+
export {
145+
sleep,
146+
generateKey,
147+
sign,
148+
verify,
149+
randomBytes,
150+
extractSocket,
151+
handleStreamProm,
152+
};

0 commit comments

Comments
 (0)