|
16 | 16 | from typing import TYPE_CHECKING, Callable
|
17 | 17 |
|
18 | 18 | if TYPE_CHECKING:
|
| 19 | + from re import Match, Pattern |
| 20 | + |
| 21 | + from hathor.p2p.protocol import HathorLineReceiver |
19 | 22 | from hathor.simulator.fake_connection import FakeConnection
|
20 | 23 | from hathor.simulator.miner import AbstractMiner
|
21 | 24 | from hathor.simulator.tx_generator import RandomTransactionGenerator
|
@@ -107,3 +110,32 @@ def __init__(self, sub_triggers: list[Trigger]) -> None:
|
107 | 110 |
|
108 | 111 | def should_stop(self) -> bool:
|
109 | 112 | return all(trigger.should_stop() for trigger in self._sub_triggers)
|
| 113 | + |
| 114 | + |
| 115 | +class StopWhenSendLineMatch(Trigger): |
| 116 | + """Stop the simulation when the node sends a line that matches a designated regex pattern. |
| 117 | + """ |
| 118 | + |
| 119 | + def __init__(self, protocol: 'HathorLineReceiver', regex: 'Pattern') -> None: |
| 120 | + # patches protocol.sendLine |
| 121 | + self.original_send_line = protocol.sendLine |
| 122 | + setattr(protocol, 'sendLine', self._send_line_wrapper) |
| 123 | + |
| 124 | + # regex pattern |
| 125 | + self.regex = regex |
| 126 | + |
| 127 | + # list of matches |
| 128 | + self.matches: list['Match'] = [] |
| 129 | + |
| 130 | + def _send_line_wrapper(self, line: str) -> None: |
| 131 | + """Check if line matches a designated regex pattern.""" |
| 132 | + self.original_send_line(line) |
| 133 | + match = self.regex.match(line) |
| 134 | + if match: |
| 135 | + self.matches.append(match) |
| 136 | + |
| 137 | + def should_stop(self) -> bool: |
| 138 | + if self.matches: |
| 139 | + self.matches = [] |
| 140 | + return True |
| 141 | + return False |
0 commit comments