diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py index 1252847e02b2ea..02c39f13b40fc2 100644 --- a/Lib/_pyrepl/reader.py +++ b/Lib/_pyrepl/reader.py @@ -751,16 +751,14 @@ def handle1(self, block: bool = True) -> bool: self.msg = "" self.dirty = True - while True: - # We use the same timeout as in readline.c: 100ms - self.run_hooks() - self.console.wait(100) - event = self.console.get_event(block=False) - if not event: - if block: - continue + # We use the same timeout as in readline.c: 100ms + self.run_hooks() + self.console.wait(10) + events = self.console.get_event(block=False) + if not events: + if block: return False - + for event in events: translate = True if event.evt == "key": @@ -780,10 +778,9 @@ def handle1(self, block: bool = True) -> bool: if cmd is None: if block: continue - return False - - self.do_cmd(cmd) - return True + else: + self.do_cmd(cmd) + return True def push_char(self, char: int | bytes) -> None: self.console.push_char(char) diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index 6d1a2f50dbf462..d8a20617cce73d 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -410,19 +410,23 @@ def _getscrollbacksize(self) -> int: return info.srWindow.Bottom # type: ignore[no-any-return] def _read_input(self, block: bool = True) -> INPUT_RECORD | None: + # Create a buffer for 32 events + buffer = (INPUT_RECORD * 32)() + num_events_read = DWORD(0) + + # Use PeekConsoleInput to check for input without blocking if not block: - events = DWORD() - if not GetNumberOfConsoleInputEvents(InHandle, events): + count = DWORD() + if not PeekConsoleInput(InHandle, buffer, 32, ctypes.byref(count)): raise WinError(GetLastError()) - if not events.value: - return None + if count.value == 0: + return [] - rec = INPUT_RECORD() - read = DWORD() - if not ReadConsoleInput(InHandle, rec, 1, read): + # Read up to 32 events at once + if not ReadConsoleInput(InHandle, buffer, 32, ctypes.byref(num_events_read)): raise WinError(GetLastError()) - return rec + return list(buffer[:num_events_read.value]) def get_event(self, block: bool = True) -> Event | None: """Return an Event instance. Returns None if |block| is false @@ -430,55 +434,57 @@ def get_event(self, block: bool = True) -> Event | None: completion of an event.""" while self.event_queue.empty(): - rec = self._read_input(block) - if rec is None: - return None + recs = self._read_input(block) + for rec in recs: + if rec == []: + continue - if rec.EventType == WINDOW_BUFFER_SIZE_EVENT: - return Event("resize", "") + if rec.EventType == WINDOW_BUFFER_SIZE_EVENT: + self.event_queue.insert(Event("resize", "")) + continue - if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown: - # Only process keys and keydown events - if block: + if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown: + # Only process keys and keydown events continue - return None - - key_event = rec.Event.KeyEvent - raw_key = key = key_event.uChar.UnicodeChar - - if key == "\r": - # Make enter unix-like - return Event(evt="key", data="\n", raw=b"\n") - elif key_event.wVirtualKeyCode == 8: - # Turn backspace directly into the command - key = "backspace" - elif key == "\x00": - # Handle special keys like arrow keys and translate them into the appropriate command - key = VK_MAP.get(key_event.wVirtualKeyCode) - if key: - if key_event.dwControlKeyState & CTRL_ACTIVE: - key = f"ctrl {key}" - elif key_event.dwControlKeyState & ALT_ACTIVE: - # queue the key, return the meta command - self.event_queue.insert(Event(evt="key", data=key, raw=key)) - return Event(evt="key", data="\033") # keymap.py uses this for meta - return Event(evt="key", data=key, raw=key) - if block: + + key_event = rec.Event.KeyEvent + raw_key = key = key_event.uChar.UnicodeChar + + if key == "\r": + # Make enter unix-like + self.event_queue.insert(Event(evt="key", data="\n", raw=b"\n")) continue + elif key_event.wVirtualKeyCode == 8: + # Turn backspace directly into the command + key = "backspace" + elif key == "\x00": + # Handle special keys like arrow keys and translate them into the appropriate command + key = VK_MAP.get(key_event.wVirtualKeyCode) + if key: + if key_event.dwControlKeyState & CTRL_ACTIVE: + key = f"ctrl {key}" + elif key_event.dwControlKeyState & ALT_ACTIVE: + # queue the key, return the meta command + self.event_queue.insert(Event(evt="key", data=key, raw=key)) + self.event_queue.insert(Event(evt="key", data="\033")) # keymap.py uses this for meta + continue + if block: + continue - return None - elif self.__vt_support: - # If virtual terminal is enabled, scanning VT sequences - self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar) - continue + continue + elif self.__vt_support: + # If virtual terminal is enabled, scanning VT sequences + self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar) + continue - if key_event.dwControlKeyState & ALT_ACTIVE: - # queue the key, return the meta command - self.event_queue.insert(Event(evt="key", data=key, raw=raw_key)) - return Event(evt="key", data="\033") # keymap.py uses this for meta + if key_event.dwControlKeyState & ALT_ACTIVE: + # queue the key, return the meta command + self.event_queue.insert(Event(evt="key", data=key, raw=raw_key)) + self.event_queue.insert(Event(evt="key", data="\033")) # keymap.py uses this for meta + continue - return Event(evt="key", data=key, raw=raw_key) - return self.event_queue.get() + self.event_queue.insert(Event(evt="key", data=key, raw=raw_key)) + return [self.event_queue.get() for x in range(min(len(self.event_queue.events), 100))] def push_char(self, char: int | bytes) -> None: """ @@ -650,6 +656,12 @@ class INPUT_RECORD(Structure): ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, POINTER(DWORD)] ReadConsoleInput.restype = BOOL + # Define PeekConsoleInput + LPDWORD = ctypes.POINTER(DWORD) + PeekConsoleInput = _KERNEL32.PeekConsoleInputW + PeekConsoleInput.argtypes = [HANDLE, ctypes.POINTER(INPUT_RECORD), DWORD, LPDWORD] + PeekConsoleInput.restype = BOOL + GetNumberOfConsoleInputEvents = _KERNEL32.GetNumberOfConsoleInputEvents GetNumberOfConsoleInputEvents.argtypes = [HANDLE, POINTER(DWORD)] GetNumberOfConsoleInputEvents.restype = BOOL @@ -671,6 +683,7 @@ def _win_only(*args, **kwargs): GetConsoleMode = _win_only SetConsoleMode = _win_only ReadConsoleInput = _win_only + PeekConsoleInput = _win_only GetNumberOfConsoleInputEvents = _win_only FlushConsoleInputBuffer = _win_only OutHandle = 0 diff --git a/Lib/test/test_pyrepl/support.py b/Lib/test/test_pyrepl/support.py index 45e3bf758f17de..6688cb1590271d 100644 --- a/Lib/test/test_pyrepl/support.py +++ b/Lib/test/test_pyrepl/support.py @@ -72,7 +72,7 @@ def get_prompt(lineno, cursor_on_line) -> str: def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console: console = MagicMock() - console.get_event.side_effect = events + console.get_event.side_effect = list(events) console.height = 100 console.width = 80 for key, val in kwargs.items(): @@ -83,7 +83,7 @@ def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console: def handle_all_events( events, prepare_console=prepare_console, prepare_reader=prepare_reader ): - console = prepare_console(events) + console = prepare_console([events]) reader = prepare_reader(console) try: while True: @@ -103,14 +103,21 @@ def handle_all_events( class FakeConsole(Console): def __init__(self, events, encoding="utf-8") -> None: - self.events = iter(events) + self.events = list(events) self.encoding = encoding self.screen = [] self.height = 100 self.width = 80 + self.index = 0 + self.events_len = len(self.events) def get_event(self, block: bool = True) -> Event | None: - return next(self.events) + if self.index < self.events_len: + index = self.index + self.index += 1 + return [self.events[index]] + else: + raise StopIteration def getpending(self) -> Event: return self.get_event(block=False) diff --git a/Lib/test/test_pyrepl/test_windows_console.py b/Lib/test/test_pyrepl/test_windows_console.py index 07eaccd1124cd6..33de077423b750 100644 --- a/Lib/test/test_pyrepl/test_windows_console.py +++ b/Lib/test/test_pyrepl/test_windows_console.py @@ -30,7 +30,7 @@ class WindowsConsoleTests(TestCase): def console(self, events, **kwargs) -> Console: console = WindowsConsole() - console.get_event = MagicMock(side_effect=events) + console.get_event = MagicMock(side_effect=list(events)) console._scroll = MagicMock() console._hide_cursor = MagicMock() console._show_cursor = MagicMock()