@@ -322,135 +322,16 @@ function connectionListener(socket) {
322
322
parser . maxHeaderPairs = 2000 ;
323
323
}
324
324
325
- socket . addListener ( 'error' , socketOnError ) ;
326
- socket . addListener ( 'close' , serverSocketCloseListener ) ;
327
- parser . onIncoming = parserOnIncoming ;
328
- socket . on ( 'end' , socketOnEnd ) ;
329
- socket . on ( 'data' , socketOnData ) ;
330
-
331
- // We are consuming socket, so it won't get any actual data
332
- socket . on ( 'resume' , onSocketResume ) ;
333
- socket . on ( 'pause' , onSocketPause ) ;
334
-
335
- socket . on ( 'drain' , socketOnDrain ) ;
336
-
337
- // Override on to unconsume on `data`, `readable` listeners
338
- socket . on = socketOnWrap ;
339
-
340
- var external = socket . _handle . _externalStream ;
341
- if ( external ) {
342
- parser . _consumed = true ;
343
- parser . consume ( external ) ;
344
- }
345
- external = null ;
346
- parser [ kOnExecute ] = onParserExecute ;
347
-
348
325
// TODO(isaacs): Move all these functions out of here
349
326
const socketOnError = ( e ) => {
350
327
// Ignore further errors
351
- this . removeListener ( 'error' , socketOnError ) ;
352
- this . on ( 'error' , ( ) => { } ) ;
353
-
354
- if ( ! this . emit ( 'clientError' , e , this ) )
355
- this . destroy ( e ) ;
356
- } ;
357
-
358
- function socketOnData ( d ) {
359
- assert ( ! socket . _paused ) ;
360
- debug ( 'SERVER socketOnData %d' , d . length ) ;
361
- var ret = parser . execute ( d ) ;
362
-
363
- onParserExecuteCommon ( ret , d ) ;
364
- }
365
-
366
- function onParserExecute ( ret , d ) {
367
- debug ( 'SERVER socketOnParserExecute %d' , ret ) ;
368
- onParserExecuteCommon ( ret , undefined ) ;
369
- }
370
-
371
- const onParserExecuteCommon = ( ret , d ) => {
372
- if ( ret instanceof Error ) {
373
- debug ( 'parse error' ) ;
374
- socketOnError . call ( socket , ret ) ;
375
- } else if ( parser . incoming && parser . incoming . upgrade ) {
376
- // Upgrade or CONNECT
377
- var bytesParsed = ret ;
378
- var req = parser . incoming ;
379
- debug ( 'SERVER upgrade or connect' , req . method ) ;
380
-
381
- if ( ! d )
382
- d = parser . getCurrentBuffer ( ) ;
383
-
384
- socket . removeListener ( 'data' , socketOnData ) ;
385
- socket . removeListener ( 'end' , socketOnEnd ) ;
386
- socket . removeListener ( 'close' , serverSocketCloseListener ) ;
387
- unconsume ( parser , socket ) ;
388
- parser . finish ( ) ;
389
- freeParser ( parser , req , null ) ;
390
- parser = null ;
391
-
392
- var eventName = req . method === 'CONNECT' ? 'connect' : 'upgrade' ;
393
- if ( this . listenerCount ( eventName ) > 0 ) {
394
- debug ( 'SERVER have listener for %s' , eventName ) ;
395
- var bodyHead = d . slice ( bytesParsed , d . length ) ;
328
+ socket . removeListener ( 'error' , socketOnError ) ;
329
+ socket . on ( 'error' , ( ) => { } ) ;
396
330
397
- // TODO(isaacs): Need a way to reset a stream to fresh state
398
- // IE, not flowing, and not explicitly paused.
399
- socket . _readableState . flowing = null ;
400
- this . emit ( eventName , req , socket , bodyHead ) ;
401
- } else {
402
- // Got upgrade header or CONNECT method, but have no handler.
403
- socket . destroy ( ) ;
404
- }
405
- }
406
-
407
- if ( socket . _paused && socket . parser ) {
408
- // onIncoming paused the socket, we should pause the parser as well
409
- debug ( 'pause parser' ) ;
410
- socket . parser . pause ( ) ;
411
- }
331
+ if ( ! this . emit ( 'clientError' , e , socket ) )
332
+ socket . destroy ( e ) ;
412
333
} ;
413
334
414
- const socketOnEnd = ( ) => {
415
- var socket = this ;
416
- var ret = parser . finish ( ) ;
417
-
418
- if ( ret instanceof Error ) {
419
- debug ( 'parse error' ) ;
420
- socketOnError . call ( socket , ret ) ;
421
- return ;
422
- }
423
-
424
- if ( ! this . httpAllowHalfOpen ) {
425
- abortIncoming ( ) ;
426
- if ( socket . writable ) socket . end ( ) ;
427
- } else if ( outgoing . length ) {
428
- outgoing [ outgoing . length - 1 ] . _last = true ;
429
- } else if ( socket . _httpMessage ) {
430
- socket . _httpMessage . _last = true ;
431
- } else {
432
- if ( socket . writable ) socket . end ( ) ;
433
- }
434
- } ;
435
-
436
-
437
- // The following callback is issued after the headers have been read on a
438
- // new message. In this callback we setup the response object and pass it
439
- // to the user.
440
-
441
- socket . _paused = false ;
442
- function socketOnDrain ( ) {
443
- var needPause = outgoingData > socket . _writableState . highWaterMark ;
444
-
445
- // If we previously paused, then start reading again.
446
- if ( socket . _paused && ! needPause ) {
447
- socket . _paused = false ;
448
- if ( socket . parser )
449
- socket . parser . resume ( ) ;
450
- socket . resume ( ) ;
451
- }
452
- }
453
-
454
335
const parserOnIncoming = ( req , shouldKeepAlive ) => {
455
336
incoming . push ( req ) ;
456
337
@@ -486,7 +367,6 @@ function connectionListener(socket) {
486
367
487
368
// When we're finished writing the response, check if this is the last
488
369
// respose, if so destroy the socket.
489
- res . on ( 'finish' , resOnFinish ) ;
490
370
const resOnFinish = ( ) => {
491
371
// Usually the first incoming element should be our request. it may
492
372
// be that in the case abortIncoming() was called that the incoming
@@ -512,7 +392,8 @@ function connectionListener(socket) {
512
392
m . assignSocket ( socket ) ;
513
393
}
514
394
}
515
- }
395
+ } ;
396
+ res . on ( 'finish' , resOnFinish ) ;
516
397
517
398
if ( req . headers . expect !== undefined &&
518
399
( req . httpVersionMajor == 1 && req . httpVersionMinor == 1 ) ) {
@@ -537,6 +418,125 @@ function connectionListener(socket) {
537
418
this . emit ( 'request' , req , res ) ;
538
419
}
539
420
return false ; // Not a HEAD response. (Not even a response!)
421
+ } ;
422
+
423
+ const socketOnEnd = ( ) => {
424
+ var ret = parser . finish ( ) ;
425
+
426
+ if ( ret instanceof Error ) {
427
+ debug ( 'parse error' ) ;
428
+ socketOnError ( ret ) ;
429
+ return ;
430
+ }
431
+
432
+ if ( ! this . httpAllowHalfOpen ) {
433
+ abortIncoming ( ) ;
434
+ if ( socket . writable ) socket . end ( ) ;
435
+ } else if ( outgoing . length ) {
436
+ outgoing [ outgoing . length - 1 ] . _last = true ;
437
+ } else if ( socket . _httpMessage ) {
438
+ socket . _httpMessage . _last = true ;
439
+ } else {
440
+ if ( socket . writable ) socket . end ( ) ;
441
+ }
442
+ } ;
443
+
444
+ const onParserExecuteCommon = ( ret , d ) => {
445
+ if ( ret instanceof Error ) {
446
+ debug ( 'parse error' ) ;
447
+ socketOnError ( ret ) ;
448
+ } else if ( parser . incoming && parser . incoming . upgrade ) {
449
+ // Upgrade or CONNECT
450
+ var bytesParsed = ret ;
451
+ var req = parser . incoming ;
452
+ debug ( 'SERVER upgrade or connect' , req . method ) ;
453
+
454
+ if ( ! d )
455
+ d = parser . getCurrentBuffer ( ) ;
456
+
457
+ socket . removeListener ( 'data' , socketOnData ) ;
458
+ socket . removeListener ( 'end' , socketOnEnd ) ;
459
+ socket . removeListener ( 'close' , serverSocketCloseListener ) ;
460
+ unconsume ( parser , socket ) ;
461
+ parser . finish ( ) ;
462
+ freeParser ( parser , req , null ) ;
463
+ parser = null ;
464
+
465
+ var eventName = req . method === 'CONNECT' ? 'connect' : 'upgrade' ;
466
+ if ( this . listenerCount ( eventName ) > 0 ) {
467
+ debug ( 'SERVER have listener for %s' , eventName ) ;
468
+ var bodyHead = d . slice ( bytesParsed , d . length ) ;
469
+
470
+ // TODO(isaacs): Need a way to reset a stream to fresh state
471
+ // IE, not flowing, and not explicitly paused.
472
+ socket . _readableState . flowing = null ;
473
+ this . emit ( eventName , req , socket , bodyHead ) ;
474
+ } else {
475
+ // Got upgrade header or CONNECT method, but have no handler.
476
+ socket . destroy ( ) ;
477
+ }
478
+ }
479
+
480
+ if ( socket . _paused && socket . parser ) {
481
+ // onIncoming paused the socket, we should pause the parser as well
482
+ debug ( 'pause parser' ) ;
483
+ socket . parser . pause ( ) ;
484
+ }
485
+ } ;
486
+
487
+ socket . addListener ( 'error' , socketOnError ) ;
488
+ socket . addListener ( 'close' , serverSocketCloseListener ) ;
489
+ parser . onIncoming = parserOnIncoming ;
490
+ socket . on ( 'end' , socketOnEnd ) ;
491
+ socket . on ( 'data' , socketOnData ) ;
492
+
493
+ // We are consuming socket, so it won't get any actual data
494
+ socket . on ( 'resume' , onSocketResume ) ;
495
+ socket . on ( 'pause' , onSocketPause ) ;
496
+
497
+ socket . on ( 'drain' , socketOnDrain ) ;
498
+
499
+ // Override on to unconsume on `data`, `readable` listeners
500
+ socket . on = socketOnWrap ;
501
+
502
+ var external = socket . _handle . _externalStream ;
503
+ if ( external ) {
504
+ parser . _consumed = true ;
505
+ parser . consume ( external ) ;
506
+ }
507
+ external = null ;
508
+ parser [ kOnExecute ] = onParserExecute ;
509
+
510
+ // TODO(isaacs): Move all these functions out of here
511
+
512
+ function socketOnData ( d ) {
513
+ assert ( ! socket . _paused ) ;
514
+ debug ( 'SERVER socketOnData %d' , d . length ) ;
515
+ var ret = parser . execute ( d ) ;
516
+
517
+ onParserExecuteCommon ( ret , d ) ;
518
+ }
519
+
520
+ function onParserExecute ( ret , d ) {
521
+ debug ( 'SERVER socketOnParserExecute %d' , ret ) ;
522
+ onParserExecuteCommon ( ret , undefined ) ;
523
+ }
524
+
525
+ // The following callback is issued after the headers have been read on a
526
+ // new message. In this callback we setup the response object and pass it
527
+ // to the user.
528
+
529
+ socket . _paused = false ;
530
+ function socketOnDrain ( ) {
531
+ var needPause = outgoingData > socket . _writableState . highWaterMark ;
532
+
533
+ // If we previously paused, then start reading again.
534
+ if ( socket . _paused && ! needPause ) {
535
+ socket . _paused = false ;
536
+ if ( socket . parser )
537
+ socket . parser . resume ( ) ;
538
+ socket . resume ( ) ;
539
+ }
540
540
}
541
541
}
542
542
exports . _connectionListener = connectionListener ;
0 commit comments