Skip to content

Commit 03e31da

Browse files
committed
wip: still trying to fix
* Related #4 [ci skip]
1 parent 923d20d commit 03e31da

File tree

5 files changed

+367
-213
lines changed

5 files changed

+367
-213
lines changed

src/QUICConnection.ts

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -436,10 +436,7 @@ class QUICConnection extends EventTarget {
436436
if (this.resolveCloseP != null) this.resolveCloseP();
437437
return;
438438
}
439-
if (
440-
!this.conn.isDraining() &&
441-
(this.conn.isInEarlyData() || this.conn.isEstablished())
442-
) {
439+
if (this.conn.isInEarlyData() || this.conn.isEstablished()) {
443440
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
444441
let quicStream = this.streamMap.get(streamId);
445442
if (quicStream == null) {
@@ -457,7 +454,6 @@ class QUICConnection extends EventTarget {
457454
new events.QUICConnectionStreamEvent({ detail: quicStream }),
458455
);
459456
}
460-
this.logger.info('processing read');
461457
quicStream.read();
462458
}
463459
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
@@ -476,18 +472,19 @@ class QUICConnection extends EventTarget {
476472
new events.QUICConnectionStreamEvent({ detail: quicStream }),
477473
);
478474
}
479-
this.logger.info('processing write');
480475
quicStream.write();
481476
}
482-
for (const [streamId, quicStream] of this.streamMap) {
483-
// Checking if state has changed, we need to check all streams here for two reasons
484-
// 1. Reading can end, but we won't know since it won't be listed as readable.
485-
// 2. Writing can end, but we won't know unless we check here or attempt a write.
486-
if (quicStream[status] === 'destroying') quicStream.isRecvFinished();
487-
quicStream.isSendFinished();
488-
}
489477
}
490478
} finally {
479+
const nums: Array<number> = [];
480+
for (const [streamId, quicStream] of this.streamMap) {
481+
// Checking if state has changed, we need to check all streams here for two reasons
482+
// 1. Reading can end, but we won't know since it won't be listed as readable.
483+
// 2. Writing can end, but we won't know unless we check here or attempt a write.
484+
nums.push(streamId);
485+
quicStream.read();
486+
}
487+
this.logger.info(`checking read finally recv for ${nums}`);
491488
this.logger.debug('RECV FINALLY');
492489
// Set the timeout
493490
this.checkTimeout();
@@ -500,7 +497,7 @@ class QUICConnection extends EventTarget {
500497
) {
501498
this.logger.debug('CALLING DESTROY 2');
502499
// Destroy in the background, we still need to process packets
503-
void this.destroy();
500+
void this.destroy().catch(() => {});
504501
}
505502
}
506503
}
@@ -596,6 +593,7 @@ class QUICConnection extends EventTarget {
596593
);
597594
this.logger.debug(`SENT ${sendLength} of data`);
598595
} catch (e) {
596+
console.error(e);
599597
this.logger.error(`send error ${e.message}`);
600598
this.dispatchEvent(
601599
new events.QUICConnectionErrorEvent({ detail: e }),
@@ -604,13 +602,13 @@ class QUICConnection extends EventTarget {
604602
}
605603
}
606604
} finally {
607-
for (const [, quicStream] of this.streamMap) {
608-
// Checking if state has changed, we need to check all streams here for two reasons
609-
// 1. Reading can end, but we won't know since it won't be listed as readable.
610-
// 2. Writing can end, but we won't know unless we check here or attempt a write.
611-
if (quicStream[status] === 'destroying') quicStream.isRecvFinished();
612-
quicStream.isSendFinished();
605+
const nums: Array<number> = [];
606+
for (const [streamId, quicStream] of this.streamMap) {
607+
// Stream sending can finish after a packet is sent
608+
nums.push(streamId);
609+
quicStream.read();
613610
}
611+
this.logger.info(`checking read finally send for ${nums}`);
614612
this.logger.debug('SEND FINALLY');
615613
this.checkTimeout();
616614
if (
@@ -700,7 +698,7 @@ class QUICConnection extends EventTarget {
700698
) {
701699
this.logger.debug('CALLING DESTROY 3');
702700
// Destroy in the background, we still need to process packets
703-
void this.destroy();
701+
void this.destroy().catch(() => {});
704702
}
705703
this.checkTimeout();
706704
};

src/QUICStream.ts

Lines changed: 99 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class QUICStream
161161
},
162162
write: async (chunk: Uint8Array) => {
163163
await this.streamSend(chunk);
164+
void this.connection.send().catch(() => {});
164165
},
165166
close: async () => {
166167
// This gracefully closes, by sending a message at the end
@@ -169,11 +170,13 @@ class QUICStream
169170
// If this itself results in an error, we can continue
170171
// But continue to do the below
171172
this.logger.info('sending fin frame');
173+
// This.sendFinishedProm.resolveP();
172174
await this.streamSend(new Uint8Array(0), true).catch((e) => {
173175
// Ignore send error if stream is already closed
174176
if (e.message !== 'send') throw e;
175177
});
176178
await this.closeSend();
179+
void this.connection.send().catch(() => {});
177180
},
178181
abort: async (reason?: any) => {
179182
// Abort can be called even if there are writes are queued up
@@ -221,7 +224,7 @@ class QUICStream
221224
this.writableController.error(e);
222225
await this.closeSend(true, e);
223226
}
224-
await this.connection.send();
227+
void this.connection.send().catch(() => {});
225228
this.logger.debug('waiting for underlying streams to finish');
226229
this.isFinished();
227230
// We need to wait for the connection to finish before fully destroying
@@ -241,10 +244,11 @@ class QUICStream
241244
// After reading it's possible the writer had a state change.
242245
this.isSendFinished();
243246
if (this._recvPaused) {
247+
console.log('SKIPPING!');
244248
// Do nothing if we are paused
245249
return;
246250
}
247-
void this.streamRecv();
251+
void this.streamRecv().catch(() => {});
248252
}
249253

250254
/**
@@ -254,7 +258,7 @@ class QUICStream
254258
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
255259
public write(): void {
256260
// Checking if writable has ended
257-
void this.isSendFinished();
261+
this.isSendFinished();
258262
if (this.resolveWritableP != null) {
259263
this.resolveWritableP();
260264
}
@@ -283,7 +287,7 @@ class QUICStream
283287
'Readable stream closed early with no reason',
284288
);
285289
this.readableController.error(err);
286-
void this.closeRecv(true, err);
290+
void this.closeRecv(true, err).catch(() => {});
287291
}
288292
}
289293
return recvFinished;
@@ -301,6 +305,7 @@ class QUICStream
301305
} catch (e) {
302306
// If the writable has ended, we need to close the writable.
303307
// We need to do this in the background to keep this synchronous.
308+
this.sendFinishedProm.resolveP();
304309
void this.processSendStreamError(e, 'send').then((reason) => {
305310
if (!this._sendClosed) {
306311
const err =
@@ -309,10 +314,10 @@ class QUICStream
309314
'Writable stream closed early with no reason',
310315
);
311316
this.writableController.error(err);
312-
void this.closeSend(true, err);
317+
void this.closeSend(true, err).catch(() => {});
313318
}
314-
this.sendFinishedProm.resolveP();
315319
});
320+
this.logger.info('send FINISHED');
316321
return true;
317322
}
318323
}
@@ -321,63 +326,63 @@ class QUICStream
321326
const buf = Buffer.alloc(1024);
322327
let recvLength: number, fin: boolean;
323328
this.logger.debug('trying receiving');
324-
try {
325-
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
326-
} catch (e) {
327-
if (e.message === 'Done') {
328-
// When it is reported to be `Done`, it just means that there is no data to read
329-
// it does not mean that the stream is closed or finished
330-
// In such a case, we just ignore and continue
331-
// However after the stream is closed, then it would continue to return `Done`
332-
// This can only occur in 2 ways, either via the `fin`
333-
// or through an exception here where the stream reports an error
334-
// Since we don't call this method unless it is readable
335-
// This should never be reported... (this branch should be dead code)
336-
return;
337-
} else {
338-
this.logger.info('Stream reported: error');
339-
this.logger.error(`Stream reported: error ${e.message}`);
340-
// Signal receiving has ended
341-
this.recvFinishedProm.resolveP();
342-
const reason = await this.processSendStreamError(e, 'recv');
343-
if (reason != null) {
344-
// If it is `StreamReset(u64)` error, then the peer has closed
345-
// the stream, and we are receiving the error code
346-
this.readableController.error(reason);
347-
await this.closeRecv(true, reason);
329+
while (true) {
330+
try {
331+
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
332+
} catch (e) {
333+
if (e.message === 'Done') {
334+
// When it is reported to be `Done`, it just means that there is no data to read
335+
// it does not mean that the stream is closed or finished
336+
// In such a case, we just ignore and continue
337+
// However after the stream is closed, then it would continue to return `Done`
338+
// This can only occur in 2 ways, either via the `fin`
339+
// or through an exception here where the stream reports an error
340+
// Since we don't call this method unless it is readable
341+
// This should never be reported... (this branch should be dead code)
342+
return;
348343
} else {
349-
// If it is not a `StreamReset(u64)`, then something else broke
350-
// and we need to propagate the error up and down the stream
351-
this.readableController.error(e);
352-
await this.closeRecv(true, e);
344+
this.logger.info('Stream reported: error');
345+
this.logger.error(`Stream reported: error ${e.message}`);
346+
// Signal receiving has ended
347+
this.recvFinishedProm.resolveP();
348+
if (!this._recvClosed) {
349+
const reason = await this.processSendStreamError(e, 'recv');
350+
if (reason != null) {
351+
// If it is `StreamReset(u64)` error, then the peer has closed
352+
// the stream, and we are receiving the error code
353+
this.readableController.error(reason);
354+
await this.closeRecv(true, reason);
355+
} else {
356+
// If it is not a `StreamReset(u64)`, then something else broke
357+
// and we need to propagate the error up and down the stream
358+
this.readableController.error(e);
359+
await this.closeRecv(true, e);
360+
}
361+
}
362+
return;
353363
}
354-
return;
355364
}
356-
} finally {
357-
// Let's check if sending side has finished
358-
await this.connection.send();
359-
}
360365

361-
// If fin is true, then that means, the stream is CLOSED
362-
if (fin) {
363-
// This will render `stream.cancel` a noop
364-
this.logger.info('Stream reported: fin');
365-
if (!this._recvClosed) this.readableController.close();
366-
await this.closeRecv();
367-
// Signal receiving has ended
368-
this.recvFinishedProm.resolveP();
369-
return;
370-
}
371-
// Only fin packets are 0 length, so we enqueue after checking fin
372-
if (!this._recvClosed) {
373-
this.readableController.enqueue(buf.subarray(0, recvLength));
374-
}
375-
// Now we pause receiving if the queue is full
376-
if (
377-
this.readableController.desiredSize != null &&
378-
this.readableController.desiredSize <= 0
379-
) {
380-
this._recvPaused = true;
366+
// If fin is true, then that means, the stream is CLOSED
367+
if (fin) {
368+
// This will render `stream.cancel` a noop
369+
this.logger.info('Stream reported: fin');
370+
if (!this._recvClosed) this.readableController.close();
371+
await this.closeRecv();
372+
// Signal receiving has ended
373+
this.recvFinishedProm.resolveP();
374+
return;
375+
}
376+
if (!this._recvClosed) {
377+
this.readableController.enqueue(buf.subarray(0, recvLength));
378+
}
379+
// Now we pause receiving if the queue is full
380+
if (
381+
this.readableController.desiredSize != null &&
382+
this.readableController.desiredSize <= 0
383+
) {
384+
this._recvPaused = true;
385+
}
381386
}
382387
}
383388

@@ -423,8 +428,6 @@ class QUICStream
423428
throw e;
424429
}
425430
}
426-
} finally {
427-
await this.connection.send();
428431
}
429432
if (sentLength < chunk.length) {
430433
const { p: writableP, resolveP: resolveWritableP } = utils.promise();
@@ -447,6 +450,7 @@ class QUICStream
447450
isError: boolean = false,
448451
reason?: any,
449452
): Promise<void> {
453+
if (isError) this.logger.error(reason.message);
450454
// Further closes are NOPs
451455
if (this._recvClosed) return;
452456
this.logger.info(`Close Recv`);
@@ -481,6 +485,7 @@ class QUICStream
481485
isError: boolean = false,
482486
reason?: any,
483487
): Promise<void> {
488+
if (isError) this.logger.error(reason.message);
484489
// Further closes are NOPs
485490
if (this._sendClosed) return;
486491
this.logger.info(`Close Send`);
@@ -530,6 +535,39 @@ class QUICStream
530535
}
531536
return null;
532537
}
538+
539+
static checkStreamStates(
540+
conn: Connection,
541+
streamId: number,
542+
message: string,
543+
logger: Logger,
544+
) {
545+
const fin = conn.streamFinished(streamId);
546+
const read = conn.streamReadable(streamId);
547+
let write: boolean | string;
548+
try {
549+
write = conn.streamWritable(streamId, 0);
550+
} catch (e) {
551+
write = e.message;
552+
}
553+
let cap: number | string;
554+
try {
555+
cap = conn.streamCapacity(streamId);
556+
} catch (e) {
557+
cap = e.message;
558+
}
559+
let readIter = false;
560+
for (const id of conn.readable()) {
561+
if (streamId === id) readIter = true;
562+
}
563+
let writeIter = false;
564+
for (const id of conn.writable()) {
565+
if (streamId === id) writeIter = true;
566+
}
567+
logger.info(
568+
`Stream states (${message}) iterRW(${readIter}, ${writeIter}),finished(${fin}), read(${read}), write(${write}), capacity(${cap})`,
569+
);
570+
}
533571
}
534572

535573
export default QUICStream;

0 commit comments

Comments
 (0)