18
18
import typing
19
19
from typing import Any , DefaultDict , Iterator , List , Set
20
20
21
+ from prometheus_client .core import Counter
22
+
21
23
from twisted .internet import defer
22
24
23
25
from synapse .api .errors import LimitExceededError
37
39
logger = logging .getLogger (__name__ )
38
40
39
41
42
+ # Track how much the ratelimiter is affecting requests
43
+ rate_limit_sleep_counter = Counter ("synapse_rate_limit_sleep" , "" )
44
+ rate_limit_reject_counter = Counter ("synapse_rate_limit_reject" , "" )
40
45
queue_wait_timer = Histogram (
41
46
"synapse_rate_limit_queue_wait_time_seconds" ,
42
47
"sec" ,
@@ -84,7 +89,7 @@ def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]
84
89
Returns:
85
90
context manager which returns a deferred.
86
91
"""
87
- return self .ratelimiters [host ].ratelimit ()
92
+ return self .ratelimiters [host ].ratelimit (host )
88
93
89
94
90
95
class _PerHostRatelimiter :
@@ -119,12 +124,14 @@ def __init__(self, clock: Clock, config: FederationRatelimitSettings):
119
124
self .request_times : List [int ] = []
120
125
121
126
@contextlib .contextmanager
122
- def ratelimit (self ) -> "Iterator[defer.Deferred[None]]" :
127
+ def ratelimit (self , host : str ) -> "Iterator[defer.Deferred[None]]" :
123
128
# `contextlib.contextmanager` takes a generator and turns it into a
124
129
# context manager. The generator should only yield once with a value
125
130
# to be returned by manager.
126
131
# Exceptions will be reraised at the yield.
127
132
133
+ self .host = host
134
+
128
135
request_id = object ()
129
136
ret = self ._on_enter (request_id )
130
137
try :
@@ -144,6 +151,8 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
144
151
# sleeping or in the ready queue).
145
152
queue_size = len (self .ready_request_queue ) + len (self .sleeping_requests )
146
153
if queue_size > self .reject_limit :
154
+ logger .debug ("Ratelimiter(%s): rejecting request" , self .host )
155
+ rate_limit_reject_counter .inc ()
147
156
raise LimitExceededError (
148
157
retry_after_ms = int (self .window_size / self .sleep_limit )
149
158
)
@@ -155,7 +164,8 @@ def queue_request() -> "defer.Deferred[None]":
155
164
queue_defer : defer .Deferred [None ] = defer .Deferred ()
156
165
self .ready_request_queue [request_id ] = queue_defer
157
166
logger .info (
158
- "Ratelimiter: queueing request (queue now %i items)" ,
167
+ "Ratelimiter(%s): queueing request (queue now %i items)" ,
168
+ self .host ,
159
169
len (self .ready_request_queue ),
160
170
)
161
171
@@ -164,19 +174,28 @@ def queue_request() -> "defer.Deferred[None]":
164
174
return defer .succeed (None )
165
175
166
176
logger .debug (
167
- "Ratelimit [%s]: len(self.request_times)=%d" ,
177
+ "Ratelimit(%s) [%s]: len(self.request_times)=%d" ,
178
+ self .host ,
168
179
id (request_id ),
169
180
len (self .request_times ),
170
181
)
171
182
172
183
if len (self .request_times ) > self .sleep_limit :
173
- logger .debug ("Ratelimiter: sleeping request for %f sec" , self .sleep_sec )
184
+ logger .debug (
185
+ "Ratelimiter(%s) [%s]: sleeping request for %f sec" ,
186
+ self .host ,
187
+ id (request_id ),
188
+ self .sleep_sec ,
189
+ )
190
+ rate_limit_sleep_counter .inc ()
174
191
ret_defer = run_in_background (self .clock .sleep , self .sleep_sec )
175
192
176
193
self .sleeping_requests .add (request_id )
177
194
178
195
def on_wait_finished (_ : Any ) -> "defer.Deferred[None]" :
179
- logger .debug ("Ratelimit [%s]: Finished sleeping" , id (request_id ))
196
+ logger .debug (
197
+ "Ratelimit(%s) [%s]: Finished sleeping" , self .host , id (request_id )
198
+ )
180
199
self .sleeping_requests .discard (request_id )
181
200
queue_defer = queue_request ()
182
201
return queue_defer
@@ -186,7 +205,9 @@ def on_wait_finished(_: Any) -> "defer.Deferred[None]":
186
205
ret_defer = queue_request ()
187
206
188
207
def on_start (r : object ) -> object :
189
- logger .debug ("Ratelimit [%s]: Processing req" , id (request_id ))
208
+ logger .debug (
209
+ "Ratelimit(%s) [%s]: Processing req" , self .host , id (request_id )
210
+ )
190
211
self .current_processing .add (request_id )
191
212
return r
192
213
@@ -217,7 +238,7 @@ def on_both(r: object) -> object:
217
238
return make_deferred_yieldable (ret_defer )
218
239
219
240
def _on_exit (self , request_id : object ) -> None :
220
- logger .debug ("Ratelimit [%s]: Processed req" , id (request_id ))
241
+ logger .debug ("Ratelimit(%s) [%s]: Processed req" , self . host , id (request_id ))
221
242
self .current_processing .discard (request_id )
222
243
try :
223
244
# start processing the next item on the queue.
0 commit comments