Skip to content

Commit d5be853

Browse files
tegefaulkesCMCDragonkai
authored andcommitted
fix: fixed up internal logic for streams and expanded tests
Should handle more edge cases now, a lot of odd interactions and edge cases was revealed by the tests. * Fixes #10 [ci skip]
1 parent 0c003a4 commit d5be853

9 files changed

+953
-173
lines changed

src/QUICClient.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Crypto, Host, Hostname, Port } from './types';
22
import type { Config } from './native/types';
33
import type { QUICConfig } from './config';
44
import type QUICConnectionMap from './QUICConnectionMap';
5+
import type { StreamCodeToReason, StreamReasonToCode } from './types';
56
import Logger from '@matrixai/logger';
67
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
78
import { destroyed, running } from '@matrixai/async-init';
@@ -52,6 +53,8 @@ class QUICClient extends EventTarget {
5253
crypto,
5354
socket,
5455
resolveHostname = utils.resolveHostname,
56+
reasonToCode,
57+
codeToReason,
5558
logger = new Logger(`${this.name}`),
5659
config = {},
5760
}: {
@@ -65,6 +68,8 @@ class QUICClient extends EventTarget {
6568
};
6669
socket?: QUICSocket;
6770
resolveHostname?: (hostname: Hostname) => Host | PromiseLike<Host>;
71+
reasonToCode?: StreamReasonToCode;
72+
codeToReason?: StreamCodeToReason;
6873
logger?: Logger;
6974
config?: Partial<QUICConfig>;
7075
}) {
@@ -146,8 +151,12 @@ class QUICClient extends EventTarget {
146151
port,
147152
},
148153
config: quicConfig,
154+
reasonToCode,
155+
codeToReason,
149156
logger: logger.getChild(
150-
`${QUICConnection.name} ${scid.toString().slice(32)}`,
157+
`${QUICConnection.name} ${scid.toString().slice(32)}-${Math.floor(
158+
Math.random() * 100,
159+
)}`,
151160
),
152161
});
153162
connection.addEventListener('error', handleConnectionError, { once: true });
@@ -279,12 +288,16 @@ class QUICClient extends EventTarget {
279288
return this._connection;
280289
}
281290

282-
public async destroy() {
291+
public async destroy({
292+
force = false,
293+
}: {
294+
force?: boolean;
295+
} = {}) {
283296
const address = utils.buildAddress(this.socket.host, this.socket.port);
284297
this.logger.info(`Destroy ${this.constructor.name} on ${address}`);
285298

286299
// We may want to allow one to specialise this
287-
await this._connection.destroy();
300+
await this._connection.destroy({ force });
288301
if (!this.isSocketShared) {
289302
await this.socket.stop();
290303
this.socket.removeEventListener('error', this.handleQUICSocketError);

src/QUICConnection.ts

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ import type QUICConnectionMap from './QUICConnectionMap';
33
import type QUICConnectionId from './QUICConnectionId';
44
// This is specialized type
55
import type { QUICConfig } from './config';
6-
import type { Host, Port, StreamId, RemoteInfo } from './types';
7-
import type { Connection, SendInfo, ConnectionErrorCode } from './native/types';
6+
import type { Host, Port, RemoteInfo, StreamId } from './types';
7+
import type { Connection, ConnectionErrorCode, SendInfo } from './native/types';
8+
import type { StreamCodeToReason, StreamReasonToCode } from './types';
89
import {
910
CreateDestroy,
1011
ready,
@@ -13,12 +14,13 @@ import {
1314
import Logger from '@matrixai/logger';
1415
import { Lock } from '@matrixai/async-locks';
1516
import { destroyed } from '@matrixai/async-init';
17+
import { buildQuicheConfig } from './config';
1618
import QUICStream from './QUICStream';
1719
import { quiche } from './native';
1820
import * as events from './events';
1921
import * as utils from './utils';
2022
import * as errors from './errors';
21-
import { buildQuicheConfig } from './config';
23+
import { promise } from './utils';
2224

2325
/**
2426
* Think of this as equivalent to `net.Socket`.
@@ -39,6 +41,9 @@ class QUICConnection extends EventTarget {
3941
public conn: Connection;
4042
public connectionMap: QUICConnectionMap;
4143
public streamMap: Map<StreamId, QUICStream> = new Map();
44+
protected reasonToCode: StreamReasonToCode;
45+
protected codeToReason: StreamCodeToReason;
46+
protected destroyingMap: Map<StreamId, QUICStream> = new Map();
4247

4348
// This basically allows one to await this promise
4449
// once resolved, always resolved...
@@ -94,12 +99,17 @@ class QUICConnection extends EventTarget {
9499
socket,
95100
remoteInfo,
96101
config,
102+
reasonToCode = () => 0,
103+
codeToReason = (type, code) =>
104+
new Error(`${type.toString()} ${code.toString()}`),
97105
logger = new Logger(`${this.name} ${scid}`),
98106
}: {
99107
scid: QUICConnectionId;
100108
socket: QUICSocket;
101109
remoteInfo: RemoteInfo;
102110
config: QUICConfig;
111+
reasonToCode?: StreamReasonToCode;
112+
codeToReason?: StreamCodeToReason;
103113
logger?: Logger;
104114
}) {
105115
logger.info(`Connect ${this.name}`);
@@ -127,6 +137,8 @@ class QUICConnection extends EventTarget {
127137
connectionId: scid,
128138
socket,
129139
remoteInfo,
140+
reasonToCode,
141+
codeToReason,
130142
logger,
131143
});
132144
socket.connectionMap.set(connection.connectionId, connection);
@@ -143,13 +155,18 @@ class QUICConnection extends EventTarget {
143155
socket,
144156
remoteInfo,
145157
config,
158+
reasonToCode = () => 0,
159+
codeToReason = (type, code) =>
160+
new Error(`${type.toString()} ${code.toString()}`),
146161
logger = new Logger(`${this.name} ${scid}`),
147162
}: {
148163
scid: QUICConnectionId;
149164
dcid: QUICConnectionId;
150165
socket: QUICSocket;
151166
remoteInfo: RemoteInfo;
152167
config: QUICConfig;
168+
reasonToCode?: StreamReasonToCode;
169+
codeToReason?: StreamCodeToReason;
153170
logger?: Logger;
154171
}): Promise<QUICConnection> {
155172
logger.info(`Accept ${this.name}`);
@@ -177,6 +194,8 @@ class QUICConnection extends EventTarget {
177194
connectionId: scid,
178195
socket,
179196
remoteInfo,
197+
reasonToCode,
198+
codeToReason,
180199
logger,
181200
});
182201
socket.connectionMap.set(connection.connectionId, connection);
@@ -190,13 +209,17 @@ class QUICConnection extends EventTarget {
190209
connectionId,
191210
socket,
192211
remoteInfo,
212+
reasonToCode,
213+
codeToReason,
193214
logger,
194215
}: {
195216
type: 'client' | 'server';
196217
conn: Connection;
197218
connectionId: QUICConnectionId;
198219
socket: QUICSocket;
199220
remoteInfo: RemoteInfo;
221+
reasonToCode: StreamReasonToCode;
222+
codeToReason: StreamCodeToReason;
200223
logger: Logger;
201224
}) {
202225
super();
@@ -208,6 +231,8 @@ class QUICConnection extends EventTarget {
208231
this.socket = socket;
209232
this._remoteHost = remoteInfo.host;
210233
this._remotePort = remoteInfo.port;
234+
this.reasonToCode = reasonToCode;
235+
this.codeToReason = codeToReason;
211236
// Sets the timeout on the first
212237
this.checkTimeout();
213238

@@ -233,7 +258,7 @@ class QUICConnection extends EventTarget {
233258

234259
// Immediately call this after construction
235260
// if you want to pass the key log to something
236-
// note that you must close the file descriptor afterwards
261+
// note that you must close the file descriptor afterward
237262
public setKeylog(path) {
238263
this.conn.setKeylog(path);
239264
}
@@ -261,17 +286,28 @@ class QUICConnection extends EventTarget {
261286
appError = false,
262287
errorCode = quiche.ConnectionErrorCode.NoError,
263288
errorMessage = '',
289+
force = false,
264290
}: {
265291
appError?: boolean;
266292
errorCode?: ConnectionErrorCode;
267293
errorMessage?: string;
294+
force?: boolean;
268295
} = {}) {
269296
this.logger.info(`Destroy ${this.constructor.name}`);
270-
// Console.log(this.conn.localError())
271-
// console.log(this.conn.peerError())
297+
// Handle destruction concurrently
298+
const destroyProms: Array<Promise<void>> = [];
272299
for (const stream of this.streamMap.values()) {
273-
await stream.destroy();
300+
if (force) {
301+
destroyProms.push(stream.destroy());
302+
} else {
303+
const destroyProm = promise();
304+
stream.addEventListener('destroy', () => destroyProm.resolveP(), {
305+
once: true,
306+
});
307+
destroyProms.push(destroyProm.p);
308+
}
274309
}
310+
await Promise.all(destroyProms);
275311
try {
276312
// If this is already closed, then `Done` will be thrown
277313
// Otherwise it can send `CONNECTION_CLOSE` frame
@@ -322,7 +358,7 @@ class QUICConnection extends EventTarget {
322358
* UDP -> Connection -> Stream
323359
* This pushes data to the streams.
324360
* When the connection is draining, we can still receive data.
325-
* However no streams are allowed to read or write.
361+
* However, no streams are allowed to read or write.
326362
*/
327363
@ready(new errors.ErrorQUICConnectionDestroyed(), false, ['destroying'])
328364
public async recv(data: Uint8Array, remoteInfo: RemoteInfo) {
@@ -345,10 +381,10 @@ class QUICConnection extends EventTarget {
345381
},
346382
};
347383
try {
348-
this.logger.debug(`Did a recv ${data.byteLength}`);
349384
this.conn.recv(data, recvInfo);
385+
this.logger.debug(`RECEIVED ${data.byteLength} of data`);
350386
} catch (e) {
351-
this.logger.error(e.message);
387+
this.logger.error(`recv error ${e.message}`);
352388
// Depending on the exception, the `this.conn.recv`
353389
// may have automatically started closing the connection
354390
if (e.message === 'TlsFail') {
@@ -381,7 +417,6 @@ class QUICConnection extends EventTarget {
381417
this.resolveEstablishedP();
382418
}
383419
if (this.conn.isClosed()) {
384-
this.logger.debug('recv CLOSED!!!!!');
385420
if (this.resolveCloseP != null) this.resolveCloseP();
386421
return;
387422
}
@@ -396,7 +431,14 @@ class QUICConnection extends EventTarget {
396431
quicStream = await QUICStream.createQUICStream({
397432
streamId,
398433
connection: this,
399-
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
434+
destroyingMap: this.destroyingMap,
435+
codeToReason: this.codeToReason,
436+
reasonToCode: this.reasonToCode,
437+
logger: this.logger.getChild(
438+
`${QUICStream.name} ${streamId}-${Math.floor(
439+
Math.random() * 100,
440+
)}`,
441+
),
400442
});
401443
this.dispatchEvent(
402444
new events.QUICConnectionStreamEvent({ detail: quicStream }),
@@ -411,14 +453,29 @@ class QUICConnection extends EventTarget {
411453
quicStream = await QUICStream.createQUICStream({
412454
streamId,
413455
connection: this,
414-
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
456+
codeToReason: this.codeToReason,
457+
reasonToCode: this.reasonToCode,
458+
destroyingMap: this.destroyingMap,
459+
logger: this.logger.getChild(
460+
`${QUICStream.name} ${streamId}-${Math.floor(
461+
Math.random() * 100,
462+
)}`,
463+
),
415464
});
416465
this.dispatchEvent(
417466
new events.QUICConnectionStreamEvent({ detail: quicStream }),
418467
);
419468
}
420469
quicStream.write();
421470
}
471+
// Checking shortlist if streams have finished.
472+
for (const [streamId, stream] of this.destroyingMap) {
473+
if (stream.isFinished()) {
474+
// If it has finished, it will trigger its own clean up.
475+
// Remove the stream from the shortlist.
476+
this.destroyingMap.delete(streamId);
477+
}
478+
}
422479
}
423480
} finally {
424481
this.logger.debug('RECV FINALLY');
@@ -534,8 +591,9 @@ class QUICConnection extends EventTarget {
534591
sendInfo.to.port,
535592
sendInfo.to.host,
536593
);
594+
this.logger.info(`SENT ${sendLength} of data`);
537595
} catch (e) {
538-
this.logger.error(e.message);
596+
this.logger.error(`send error ${e.message}`);
539597
this.dispatchEvent(
540598
new events.QUICConnectionErrorEvent({ detail: e }),
541599
);
@@ -544,18 +602,11 @@ class QUICConnection extends EventTarget {
544602
}
545603
} finally {
546604
this.logger.debug('SEND FINALLY');
547-
this.logger.debug(
548-
` ________ ED: ${this.conn.isInEarlyData()} TO: ${this.conn.isTimedOut()} EST: ${this.conn.isEstablished()}`,
549-
);
550605
this.checkTimeout();
551-
this.logger.debug(
552-
`state are draining: ${this.conn.isDraining()}, closed: ${this.conn.isClosed()}`,
553-
);
554606
if (
555607
this[status] !== 'destroying' &&
556608
(this.conn.isClosed() || this.conn.isDraining())
557609
) {
558-
this.logger.debug('CALLING DESTROY');
559610
// Ignore errors and run in background
560611
void this.destroy().catch(() => {});
561612
} else if (
@@ -605,24 +656,13 @@ class QUICConnection extends EventTarget {
605656
const quicStream = await QUICStream.createQUICStream({
606657
streamId: streamId!,
607658
connection: this,
608-
logger: this.logger.getChild(`${QUICStream.name} ${streamId!}`),
659+
codeToReason: this.codeToReason,
660+
reasonToCode: this.reasonToCode,
661+
destroyingMap: this.destroyingMap,
662+
logger: this.logger.getChild(
663+
`${QUICStream.name} ${streamId!}-${Math.floor(Math.random() * 100)}`,
664+
),
609665
});
610-
const writer = quicStream.writable.getWriter();
611-
612-
try {
613-
// This will now wait until the 0-length buffer is actually sent
614-
await writer.write(new Uint8Array(0));
615-
writer.releaseLock();
616-
} catch (e) {
617-
// You must release the lock even before you run destroy
618-
writer.releaseLock();
619-
// If the write failed, it will only close the sending side
620-
// But in this case, it means we actually failed to open the stream entirely
621-
// In which case we destroy the stream
622-
// Do we need to release the writer?
623-
await quicStream.destroy();
624-
throw e;
625-
}
626666
// Ok the stream is opened and working
627667
if (this.type === 'client' && streamType === 'bidi') {
628668
this.streamIdClientBidi = (this.streamIdClientBidi + 4) as StreamId;
@@ -647,9 +687,7 @@ class QUICConnection extends EventTarget {
647687
this.logger.debug(
648688
`state are draining: ${this.conn.isDraining()}, closed: ${this.conn.isClosed()}`,
649689
);
650-
this.logger.debug('timeout SEND');
651690
if (this[destroyed] === false) await this.send();
652-
this.logger.debug('timeout SENDAFTER');
653691
if (
654692
this[status] !== 'destroying' &&
655693
(this.conn.isClosed() || this.conn.isDraining())

0 commit comments

Comments
 (0)