Skip to content

Commit 7e71e58

Browse files
fix queue out of sync (#107)
1 parent 476eee7 commit 7e71e58

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

cads_broker/dispatcher.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,11 @@ def __init__(self) -> None:
125125
self.queue_dict: dict = dict()
126126
self._lock = threading.RLock()
127127
# default value is before the release
128-
self.last_created_at: datetime.datetime = datetime.datetime(2024, 1, 1)
128+
self.last_created_at: datetime.datetime
129+
self.set_default_last_created_at()
130+
131+
def set_default_last_created_at(self) -> None:
132+
self.last_created_at = datetime.datetime(2024, 1, 1)
129133

130134
def get(self, key: str, default=None) -> Any:
131135
with self._lock:
@@ -157,6 +161,11 @@ def len(self) -> int:
157161
with self._lock:
158162
return len(self.queue_dict)
159163

164+
def reset(self) -> None:
165+
with self._lock:
166+
self.queue_dict = dict()
167+
self.set_default_last_created_at()
168+
160169

161170
class QoSRules:
162171
def __init__(self) -> None:
@@ -435,11 +444,13 @@ def run(self) -> None:
435444
last_created_at=self.queue.last_created_at,
436445
)
437446
):
447+
# if the internal queue is not in sync with the database, re-sync it
438448
logger.info(
439-
"not in sync",
449+
"re-syncing internal queue",
440450
internal_queue={queue_length},
441451
db_queue={db_queue},
442452
)
453+
self.queue.reset()
443454

444455
self.running_requests = len(
445456
[

0 commit comments

Comments
 (0)