Skip to content

Commit 6248e55

Browse files
committed
wip: still trying to fix
* Related #4 [ci skip]
1 parent 2eec122 commit 6248e55

File tree

5 files changed

+365
-211
lines changed

5 files changed

+365
-211
lines changed

src/QUICConnection.ts

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,7 @@ class QUICConnection extends EventTarget {
459459
if (this.resolveCloseP != null) this.resolveCloseP();
460460
return;
461461
}
462-
if (
463-
!this.conn.isDraining() &&
464-
(this.conn.isInEarlyData() || this.conn.isEstablished())
465-
) {
462+
if (this.conn.isInEarlyData() || this.conn.isEstablished()) {
466463
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
467464
let quicStream = this.streamMap.get(streamId);
468465
if (quicStream == null) {
@@ -480,7 +477,6 @@ class QUICConnection extends EventTarget {
480477
new events.QUICConnectionStreamEvent({ detail: quicStream }),
481478
);
482479
}
483-
this.logger.info('processing read');
484480
quicStream.read();
485481
quicStream.dispatchEvent(new events.QUICStreamReadableEvent());
486482
}
@@ -504,15 +500,17 @@ class QUICConnection extends EventTarget {
504500
this.logger.info('processing write');
505501
quicStream.write();
506502
}
507-
for (const [streamId, quicStream] of this.streamMap) {
508-
// Checking if state has changed, we need to check all streams here for two reasons
509-
// 1. Reading can end, but we won't know since it won't be listed as readable.
510-
// 2. Writing can end, but we won't know unless we check here or attempt a write.
511-
if (quicStream[status] === 'destroying') quicStream.isRecvFinished();
512-
quicStream.isSendFinished();
513-
}
514503
}
515504
} finally {
505+
const nums: Array<number> = [];
506+
for (const [streamId, quicStream] of this.streamMap) {
507+
// Checking if state has changed, we need to check all streams here for two reasons
508+
// 1. Reading can end, but we won't know since it won't be listed as readable.
509+
// 2. Writing can end, but we won't know unless we check here or attempt a write.
510+
nums.push(streamId);
511+
quicStream.read();
512+
}
513+
this.logger.info(`checking read finally recv for ${nums}`);
516514
this.logger.debug('RECV FINALLY');
517515
// Set the timeout
518516
this.checkTimeout();
@@ -525,7 +523,7 @@ class QUICConnection extends EventTarget {
525523
) {
526524
this.logger.debug('CALLING DESTROY 2');
527525
// Destroy in the background, we still need to process packets
528-
void this.destroy();
526+
void this.destroy().catch(() => {});
529527
}
530528
}
531529
}
@@ -621,6 +619,7 @@ class QUICConnection extends EventTarget {
621619
);
622620
this.logger.debug(`SENT ${sendLength} of data`);
623621
} catch (e) {
622+
console.error(e);
624623
this.logger.error(`send error ${e.message}`);
625624
this.dispatchEvent(
626625
new events.QUICConnectionErrorEvent({ detail: e }),
@@ -630,13 +629,13 @@ class QUICConnection extends EventTarget {
630629
this.dispatchEvent(new events.QUICConnectionSendEvent());
631630
}
632631
} finally {
633-
for (const [, quicStream] of this.streamMap) {
634-
// Checking if state has changed, we need to check all streams here for two reasons
635-
// 1. Reading can end, but we won't know since it won't be listed as readable.
636-
// 2. Writing can end, but we won't know unless we check here or attempt a write.
637-
if (quicStream[status] === 'destroying') quicStream.isRecvFinished();
638-
quicStream.isSendFinished();
632+
const nums: Array<number> = [];
633+
for (const [streamId, quicStream] of this.streamMap) {
634+
// Stream sending can finish after a packet is sent
635+
nums.push(streamId);
636+
quicStream.read();
639637
}
638+
this.logger.info(`checking read finally send for ${nums}`);
640639
this.logger.debug('SEND FINALLY');
641640
this.checkTimeout();
642641
if (
@@ -747,7 +746,7 @@ class QUICConnection extends EventTarget {
747746
) {
748747
this.logger.debug('CALLING DESTROY 3');
749748
// Destroy in the background, we still need to process packets
750-
void this.destroy();
749+
void this.destroy().catch(() => {});
751750
}
752751
this.checkTimeout();
753752
};

src/QUICStream.ts

Lines changed: 99 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ class QUICStream
162162
},
163163
write: async (chunk: Uint8Array) => {
164164
await this.streamSend(chunk);
165+
void this.connection.send().catch(() => {});
165166
},
166167
close: async () => {
167168
// This gracefully closes, by sending a message at the end
@@ -170,11 +171,13 @@ class QUICStream
170171
// If this itself results in an error, we can continue
171172
// But continue to do the below
172173
this.logger.info('sending fin frame');
174+
// This.sendFinishedProm.resolveP();
173175
await this.streamSend(new Uint8Array(0), true).catch((e) => {
174176
// Ignore send error if stream is already closed
175177
if (e.message !== 'send') throw e;
176178
});
177179
await this.closeSend();
180+
void this.connection.send().catch(() => {});
178181
},
179182
abort: async (reason?: any) => {
180183
// Abort can be called even if there are writes are queued up
@@ -237,7 +240,7 @@ class QUICStream
237240
this.writableController.error(e);
238241
await this.closeSend(true, e);
239242
}
240-
await this.connection.send();
243+
void this.connection.send().catch(() => {});
241244
this.logger.debug('waiting for underlying streams to finish');
242245
this.isFinished();
243246
// We need to wait for the connection to finish before fully destroying
@@ -272,10 +275,11 @@ class QUICStream
272275
// After reading it's possible the writer had a state change.
273276
this.isSendFinished();
274277
if (this._recvPaused) {
278+
console.log('SKIPPING!');
275279
// Do nothing if we are paused
276280
return;
277281
}
278-
void this.streamRecv();
282+
void this.streamRecv().catch(() => {});
279283
}
280284

281285
/**
@@ -285,7 +289,7 @@ class QUICStream
285289
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
286290
public write(): void {
287291
// Checking if writable has ended
288-
void this.isSendFinished();
292+
this.isSendFinished();
289293
if (this.resolveWritableP != null) {
290294
this.resolveWritableP();
291295
}
@@ -314,7 +318,7 @@ class QUICStream
314318
'Readable stream closed early with no reason',
315319
);
316320
this.readableController.error(err);
317-
void this.closeRecv(true, err);
321+
void this.closeRecv(true, err).catch(() => {});
318322
}
319323
}
320324
return recvFinished;
@@ -332,6 +336,7 @@ class QUICStream
332336
} catch (e) {
333337
// If the writable has ended, we need to close the writable.
334338
// We need to do this in the background to keep this synchronous.
339+
this.sendFinishedProm.resolveP();
335340
void this.processSendStreamError(e, 'send').then((reason) => {
336341
if (!this._sendClosed) {
337342
const err =
@@ -340,10 +345,10 @@ class QUICStream
340345
'Writable stream closed early with no reason',
341346
);
342347
this.writableController.error(err);
343-
void this.closeSend(true, err);
348+
void this.closeSend(true, err).catch(() => {});
344349
}
345-
this.sendFinishedProm.resolveP();
346350
});
351+
this.logger.info('send FINISHED');
347352
return true;
348353
}
349354
}
@@ -352,63 +357,63 @@ class QUICStream
352357
const buf = Buffer.alloc(1024);
353358
let recvLength: number, fin: boolean;
354359
this.logger.debug('trying receiving');
355-
try {
356-
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
357-
} catch (e) {
358-
if (e.message === 'Done') {
359-
// When it is reported to be `Done`, it just means that there is no data to read
360-
// it does not mean that the stream is closed or finished
361-
// In such a case, we just ignore and continue
362-
// However after the stream is closed, then it would continue to return `Done`
363-
// This can only occur in 2 ways, either via the `fin`
364-
// or through an exception here where the stream reports an error
365-
// Since we don't call this method unless it is readable
366-
// This should never be reported... (this branch should be dead code)
367-
return;
368-
} else {
369-
this.logger.info('Stream reported: error');
370-
this.logger.error(`Stream reported: error ${e.message}`);
371-
// Signal receiving has ended
372-
this.recvFinishedProm.resolveP();
373-
const reason = await this.processSendStreamError(e, 'recv');
374-
if (reason != null) {
375-
// If it is `StreamReset(u64)` error, then the peer has closed
376-
// the stream, and we are receiving the error code
377-
this.readableController.error(reason);
378-
await this.closeRecv(true, reason);
360+
while (true) {
361+
try {
362+
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
363+
} catch (e) {
364+
if (e.message === 'Done') {
365+
// When it is reported to be `Done`, it just means that there is no data to read
366+
// it does not mean that the stream is closed or finished
367+
// In such a case, we just ignore and continue
368+
// However after the stream is closed, then it would continue to return `Done`
369+
// This can only occur in 2 ways, either via the `fin`
370+
// or through an exception here where the stream reports an error
371+
// Since we don't call this method unless it is readable
372+
// This should never be reported... (this branch should be dead code)
373+
return;
379374
} else {
380-
// If it is not a `StreamReset(u64)`, then something else broke
381-
// and we need to propagate the error up and down the stream
382-
this.readableController.error(e);
383-
await this.closeRecv(true, e);
375+
this.logger.info('Stream reported: error');
376+
this.logger.error(`Stream reported: error ${e.message}`);
377+
// Signal receiving has ended
378+
this.recvFinishedProm.resolveP();
379+
if (!this._recvClosed) {
380+
const reason = await this.processSendStreamError(e, 'recv');
381+
if (reason != null) {
382+
// If it is `StreamReset(u64)` error, then the peer has closed
383+
// the stream, and we are receiving the error code
384+
this.readableController.error(reason);
385+
await this.closeRecv(true, reason);
386+
} else {
387+
// If it is not a `StreamReset(u64)`, then something else broke
388+
// and we need to propagate the error up and down the stream
389+
this.readableController.error(e);
390+
await this.closeRecv(true, e);
391+
}
392+
}
393+
return;
384394
}
385-
return;
386395
}
387-
} finally {
388-
// Let's check if sending side has finished
389-
await this.connection.send();
390-
}
391396

392-
// If fin is true, then that means, the stream is CLOSED
393-
if (fin) {
394-
// This will render `stream.cancel` a noop
395-
this.logger.info('Stream reported: fin');
396-
if (!this._recvClosed) this.readableController.close();
397-
await this.closeRecv();
398-
// Signal receiving has ended
399-
this.recvFinishedProm.resolveP();
400-
return;
401-
}
402-
// Only fin packets are 0 length, so we enqueue after checking fin
403-
if (!this._recvClosed) {
404-
this.readableController.enqueue(buf.subarray(0, recvLength));
405-
}
406-
// Now we pause receiving if the queue is full
407-
if (
408-
this.readableController.desiredSize != null &&
409-
this.readableController.desiredSize <= 0
410-
) {
411-
this._recvPaused = true;
397+
// If fin is true, then that means, the stream is CLOSED
398+
if (fin) {
399+
// This will render `stream.cancel` a noop
400+
this.logger.info('Stream reported: fin');
401+
if (!this._recvClosed) this.readableController.close();
402+
await this.closeRecv();
403+
// Signal receiving has ended
404+
this.recvFinishedProm.resolveP();
405+
return;
406+
}
407+
if (!this._recvClosed) {
408+
this.readableController.enqueue(buf.subarray(0, recvLength));
409+
}
410+
// Now we pause receiving if the queue is full
411+
if (
412+
this.readableController.desiredSize != null &&
413+
this.readableController.desiredSize <= 0
414+
) {
415+
this._recvPaused = true;
416+
}
412417
}
413418
}
414419

@@ -454,8 +459,6 @@ class QUICStream
454459
throw e;
455460
}
456461
}
457-
} finally {
458-
await this.connection.send();
459462
}
460463
if (sentLength < chunk.length) {
461464
const { p: writableP, resolveP: resolveWritableP } = utils.promise();
@@ -478,6 +481,7 @@ class QUICStream
478481
isError: boolean = false,
479482
reason?: any,
480483
): Promise<void> {
484+
if (isError) this.logger.error(reason.message);
481485
// Further closes are NOPs
482486
if (this._recvClosed) return;
483487
this.logger.info(`Close Recv`);
@@ -512,6 +516,7 @@ class QUICStream
512516
isError: boolean = false,
513517
reason?: any,
514518
): Promise<void> {
519+
if (isError) this.logger.error(reason.message);
515520
// Further closes are NOPs
516521
if (this._sendClosed) return;
517522
this.logger.info(`Close Send`);
@@ -561,6 +566,39 @@ class QUICStream
561566
}
562567
return null;
563568
}
569+
570+
static checkStreamStates(
571+
conn: Connection,
572+
streamId: number,
573+
message: string,
574+
logger: Logger,
575+
) {
576+
const fin = conn.streamFinished(streamId);
577+
const read = conn.streamReadable(streamId);
578+
let write: boolean | string;
579+
try {
580+
write = conn.streamWritable(streamId, 0);
581+
} catch (e) {
582+
write = e.message;
583+
}
584+
let cap: number | string;
585+
try {
586+
cap = conn.streamCapacity(streamId);
587+
} catch (e) {
588+
cap = e.message;
589+
}
590+
let readIter = false;
591+
for (const id of conn.readable()) {
592+
if (streamId === id) readIter = true;
593+
}
594+
let writeIter = false;
595+
for (const id of conn.writable()) {
596+
if (streamId === id) writeIter = true;
597+
}
598+
logger.info(
599+
`Stream states (${message}) iterRW(${readIter}, ${writeIter}),finished(${fin}), read(${read}), write(${write}), capacity(${cap})`,
600+
);
601+
}
564602
}
565603

566604
export default QUICStream;

0 commit comments

Comments
 (0)