@@ -5,7 +5,7 @@ use ntex_bytes::{ByteString, Bytes};
5
5
use ntex_http:: { HeaderMap , Method } ;
6
6
use ntex_io:: IoRef ;
7
7
use ntex_util:: time:: { self , now, sleep} ;
8
- use ntex_util:: { channel:: pool, future:: Either , spawn, HashMap } ;
8
+ use ntex_util:: { channel:: pool, future:: Either , spawn, HashMap , HashSet } ;
9
9
10
10
use crate :: config:: { Config , ConfigInner } ;
11
11
use crate :: error:: { ConnectionError , OperationError , StreamError , StreamErrorInner } ;
@@ -107,7 +107,7 @@ impl Connection {
107
107
streams_count : Cell :: new ( 0 ) ,
108
108
pings_count : Cell :: new ( 0 ) ,
109
109
readiness : RefCell :: new ( VecDeque :: new ( ) ) ,
110
- next_stream_id : Cell :: new ( StreamId :: new ( 1 ) ) ,
110
+ next_stream_id : Cell :: new ( StreamId :: CLIENT ) ,
111
111
local_config : config,
112
112
local_max_concurrent_streams : Cell :: new ( None ) ,
113
113
local_pending_reset : Default :: default ( ) ,
@@ -471,10 +471,6 @@ impl RecvHalfConnection {
471
471
// if client and no stream, then it was closed
472
472
self . encode ( frame:: Reset :: new ( id, frame:: Reason :: STREAM_CLOSED ) ) ;
473
473
Ok ( None )
474
- } else if id < self . 0 . next_stream_id . get ( ) {
475
- Err ( Either :: Left ( ConnectionError :: InvalidStreamId (
476
- "Received headers" ,
477
- ) ) )
478
474
} else {
479
475
// refuse stream if connection is preparing for disconnect
480
476
if self
@@ -525,7 +521,6 @@ impl RecvHalfConnection {
525
521
Err ( Either :: Left ( ConnectionError :: UnexpectedPseudo ( "scheme" ) ) )
526
522
} else {
527
523
let stream = StreamRef :: new ( id, true , Connection ( self . 0 . clone ( ) ) ) ;
528
- self . 0 . next_stream_id . set ( id) ;
529
524
self . 0 . streams_count . set ( self . 0 . streams_count . get ( ) + 1 ) ;
530
525
self . 0 . streams . borrow_mut ( ) . insert ( id, stream. clone ( ) ) ;
531
526
self . 0
@@ -868,8 +863,8 @@ async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) {
868
863
}
869
864
}
870
865
871
- const BLOCKS : usize = 6 ;
872
- const LAST_BLOCK : usize = 5 ;
866
+ const BLOCKS : usize = 5 ;
867
+ const LAST_BLOCK : usize = 4 ;
873
868
874
869
#[ cfg( not( test) ) ]
875
870
const SECS : u64 = 30 ;
@@ -881,58 +876,67 @@ const ALL_BLOCKS: Duration = Duration::from_secs((BLOCKS as u64) * SECS);
881
876
882
877
#[ derive( Default ) ]
883
878
struct Pending {
884
- blocks : RefCell < [ Option < ( StreamId , Instant ) > ; BLOCKS ] > ,
879
+ idx : Cell < u8 > ,
880
+ blocks : RefCell < [ Block ; BLOCKS ] > ,
881
+ }
882
+
883
+ #[ derive( Debug ) ]
884
+ struct Block {
885
+ start_time : Instant ,
886
+ ids : HashSet < StreamId > ,
885
887
}
886
888
887
889
impl Pending {
888
890
fn add ( & self , id : StreamId ) {
889
891
let cur = now ( ) ;
892
+ let idx = self . idx . get ( ) as usize ;
890
893
let mut blocks = self . blocks . borrow_mut ( ) ;
891
894
892
- if let Some ( item) = & blocks[ 0 ] {
893
- // check if we need to insert new block
894
- if item. 1 < ( cur - BLOCK_SIZE ) {
895
- // shift blocks
896
- let mut i = LAST_BLOCK - 1 ;
897
- loop {
898
- blocks[ i + 1 ] = blocks[ i] ;
899
- if i == 0 {
900
- break ;
901
- }
902
- i -= 1 ;
903
- }
904
- // insert new item
905
- blocks[ 0 ] = Some ( ( id, cur) ) ;
906
- }
907
- } else {
895
+ // check if we need to insert new block
896
+ if blocks[ idx] . start_time < ( cur - BLOCK_SIZE ) {
897
+ // shift blocks
898
+ let idx = if idx == 0 { LAST_BLOCK } else { idx - 1 } ;
908
899
// insert new item
909
- blocks[ 0 ] = Some ( ( id, cur) ) ;
900
+ blocks[ idx] . start_time = cur;
901
+ blocks[ idx] . ids . clear ( ) ;
902
+ blocks[ idx] . ids . insert ( id) ;
903
+ self . idx . set ( idx as u8 ) ;
910
904
}
911
905
}
912
906
913
907
fn is_pending ( & self , id : StreamId ) -> bool {
914
- let mut blocks = self . blocks . borrow_mut ( ) ;
915
- let mut idx = LAST_BLOCK ;
916
- let mut cur = now ( ) - ALL_BLOCKS ;
908
+ let blocks = self . blocks . borrow_mut ( ) ;
909
+
910
+ let max = now ( ) - ALL_BLOCKS ;
911
+ let mut idx = self . idx . get ( ) as usize ;
912
+
917
913
loop {
918
- if let Some ( item) = & blocks[ idx] {
919
- if item. 1 < cur {
920
- blocks[ idx] = None ;
921
- } else {
922
- return id >= item. 0 ;
923
- }
924
- } else {
925
- cur += BLOCK_SIZE ;
914
+ let item = & blocks[ idx] ;
915
+ if item. start_time < max {
916
+ break ;
917
+ } else if item. ids . contains ( & id) {
918
+ return true ;
926
919
}
927
- if idx == 0 {
920
+ idx += 1 ;
921
+ if idx == BLOCKS {
922
+ idx = 0 ;
923
+ } else if idx == self . idx . get ( ) as usize {
928
924
break ;
929
925
}
930
- idx -= 1 ;
931
926
}
932
927
false
933
928
}
934
929
}
935
930
931
+ impl Default for Block {
932
+ fn default ( ) -> Self {
933
+ Self {
934
+ ids : HashSet :: default ( ) ,
935
+ start_time : now ( ) - ALL_BLOCKS ,
936
+ }
937
+ }
938
+ }
939
+
936
940
#[ cfg( test) ]
937
941
mod tests {
938
942
use ntex:: http:: { test:: server as test_server, HeaderMap , Method } ;
@@ -1027,7 +1031,7 @@ mod tests {
1027
1031
let res = get_reset ( io. recv ( & codec) . await . unwrap ( ) . unwrap ( ) ) ;
1028
1032
assert_eq ! ( res. reason( ) , Reason :: STREAM_CLOSED ) ;
1029
1033
1030
- sleep ( Millis ( 1100 ) ) . await ;
1034
+ sleep ( Millis ( 5100 ) ) . await ;
1031
1035
1032
1036
// prev closed stream
1033
1037
io. send ( pl. into ( ) , & codec) . await . unwrap ( ) ;
0 commit comments