From 460b62d5c4ce92f66cb801463d2a151ee06e7897 Mon Sep 17 00:00:00 2001 From: devdanzin <74280297+devdanzin@users.noreply.github.com> Date: Sun, 23 Feb 2025 08:46:36 -0300 Subject: [PATCH 1/2] WIP: try to speed up pasting in Windows PyREPL. --- Lib/_pyrepl/base_eventqueue.py | 108 +++++++++++ Lib/_pyrepl/reader.py | 23 ++- Lib/_pyrepl/unix_eventqueue.py | 86 +-------- Lib/_pyrepl/windows_console.py | 167 ++++++++++++------ Lib/_pyrepl/windows_eventqueue.py | 42 +++++ Lib/test/test_pyrepl/stdin_tty.py | 6 + Lib/test/test_pyrepl/support.py | 15 +- ..._unix_eventqueue.py => test_eventqueue.py} | 70 +++++--- Lib/test/test_pyrepl/test_windows_console.py | 2 +- ...-09-16-17-03-52.gh-issue-124096.znin0O.rst | 3 + 10 files changed, 348 insertions(+), 174 deletions(-) create mode 100644 Lib/_pyrepl/base_eventqueue.py create mode 100644 Lib/_pyrepl/windows_eventqueue.py create mode 100644 Lib/test/test_pyrepl/stdin_tty.py rename Lib/test/test_pyrepl/{test_unix_eventqueue.py => test_eventqueue.py} (68%) create mode 100644 Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst diff --git a/Lib/_pyrepl/base_eventqueue.py b/Lib/_pyrepl/base_eventqueue.py new file mode 100644 index 00000000000000..9cae1db112a838 --- /dev/null +++ b/Lib/_pyrepl/base_eventqueue.py @@ -0,0 +1,108 @@ +# Copyright 2000-2008 Michael Hudson-Doyle +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +""" +OS-independent base for an event and VT sequence scanner + +See unix_eventqueue and windows_eventqueue for subclasses. +""" + +from collections import deque + +from . import keymap +from .console import Event +from .trace import trace + +class BaseEventQueue: + def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None: + self.compiled_keymap = keymap.compile_keymap(keymap_dict) + self.keymap = self.compiled_keymap + trace("keymap {k!r}", k=self.keymap) + self.encoding = encoding + self.events: deque[Event] = deque() + self.buf = bytearray() + + def get(self) -> Event | None: + """ + Retrieves the next event from the queue. + """ + if self.events: + return self.events.popleft() + else: + return None + + def empty(self) -> bool: + """ + Checks if the queue is empty. + """ + return not self.events + + def flush_buf(self) -> bytearray: + """ + Flushes the buffer and returns its contents. + """ + old = self.buf + self.buf = bytearray() + return old + + def insert(self, event: Event) -> None: + """ + Inserts an event into the queue. + """ + trace('added event {event}', event=event) + self.events.append(event) + + def push(self, char: int | bytes) -> None: + """ + Processes a character by updating the buffer and handling special key mappings. + """ + ord_char = char if isinstance(char, int) else ord(char) + char = bytes(bytearray((ord_char,))) + self.buf.append(ord_char) + if char in self.keymap: + if self.keymap is self.compiled_keymap: + # sanity check, buffer is empty when a special key comes + assert len(self.buf) == 1 + k = self.keymap[char] + trace('found map {k!r}', k=k) + if isinstance(k, dict): + self.keymap = k + else: + self.insert(Event('key', k, self.flush_buf())) + self.keymap = self.compiled_keymap + + elif self.buf and self.buf[0] == 27: # escape + # escape sequence not recognized by our keymap: propagate it + # outside so that i can be recognized as an M-... key (see also + # the docstring in keymap.py + trace('unrecognized escape sequence, propagating...') + self.keymap = self.compiled_keymap + self.insert(Event('key', '\033', bytearray(b'\033'))) + for _c in self.flush_buf()[1:]: + self.push(_c) + + else: + try: + decoded = bytes(self.buf).decode(self.encoding) + except UnicodeError: + return + else: + self.insert(Event('key', decoded, self.flush_buf())) + self.keymap = self.compiled_keymap diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py index 1252847e02b2ea..02c39f13b40fc2 100644 --- a/Lib/_pyrepl/reader.py +++ b/Lib/_pyrepl/reader.py @@ -751,16 +751,14 @@ def handle1(self, block: bool = True) -> bool: self.msg = "" self.dirty = True - while True: - # We use the same timeout as in readline.c: 100ms - self.run_hooks() - self.console.wait(100) - event = self.console.get_event(block=False) - if not event: - if block: - continue + # We use the same timeout as in readline.c: 100ms + self.run_hooks() + self.console.wait(10) + events = self.console.get_event(block=False) + if not events: + if block: return False - + for event in events: translate = True if event.evt == "key": @@ -780,10 +778,9 @@ def handle1(self, block: bool = True) -> bool: if cmd is None: if block: continue - return False - - self.do_cmd(cmd) - return True + else: + self.do_cmd(cmd) + return True def push_char(self, char: int | bytes) -> None: self.console.push_char(char) diff --git a/Lib/_pyrepl/unix_eventqueue.py b/Lib/_pyrepl/unix_eventqueue.py index 70cfade26e23b1..29b3e9dd5efd07 100644 --- a/Lib/_pyrepl/unix_eventqueue.py +++ b/Lib/_pyrepl/unix_eventqueue.py @@ -18,12 +18,9 @@ # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -from collections import deque - -from . import keymap -from .console import Event from . import curses from .trace import trace +from .base_eventqueue import BaseEventQueue from termios import tcgetattr, VERASE import os @@ -70,83 +67,10 @@ def get_terminal_keycodes() -> dict[bytes, str]: keycodes.update(CTRL_ARROW_KEYCODES) return keycodes -class EventQueue: +class EventQueue(BaseEventQueue): def __init__(self, fd: int, encoding: str) -> None: - self.keycodes = get_terminal_keycodes() + keycodes = get_terminal_keycodes() if os.isatty(fd): backspace = tcgetattr(fd)[6][VERASE] - self.keycodes[backspace] = "backspace" - self.compiled_keymap = keymap.compile_keymap(self.keycodes) - self.keymap = self.compiled_keymap - trace("keymap {k!r}", k=self.keymap) - self.encoding = encoding - self.events: deque[Event] = deque() - self.buf = bytearray() - - def get(self) -> Event | None: - """ - Retrieves the next event from the queue. - """ - if self.events: - return self.events.popleft() - else: - return None - - def empty(self) -> bool: - """ - Checks if the queue is empty. - """ - return not self.events - - def flush_buf(self) -> bytearray: - """ - Flushes the buffer and returns its contents. - """ - old = self.buf - self.buf = bytearray() - return old - - def insert(self, event: Event) -> None: - """ - Inserts an event into the queue. - """ - trace('added event {event}', event=event) - self.events.append(event) - - def push(self, char: int | bytes) -> None: - """ - Processes a character by updating the buffer and handling special key mappings. - """ - ord_char = char if isinstance(char, int) else ord(char) - char = bytes(bytearray((ord_char,))) - self.buf.append(ord_char) - if char in self.keymap: - if self.keymap is self.compiled_keymap: - #sanity check, buffer is empty when a special key comes - assert len(self.buf) == 1 - k = self.keymap[char] - trace('found map {k!r}', k=k) - if isinstance(k, dict): - self.keymap = k - else: - self.insert(Event('key', k, self.flush_buf())) - self.keymap = self.compiled_keymap - - elif self.buf and self.buf[0] == 27: # escape - # escape sequence not recognized by our keymap: propagate it - # outside so that i can be recognized as an M-... key (see also - # the docstring in keymap.py - trace('unrecognized escape sequence, propagating...') - self.keymap = self.compiled_keymap - self.insert(Event('key', '\033', bytearray(b'\033'))) - for _c in self.flush_buf()[1:]: - self.push(_c) - - else: - try: - decoded = bytes(self.buf).decode(self.encoding) - except UnicodeError: - return - else: - self.insert(Event('key', decoded, self.flush_buf())) - self.keymap = self.compiled_keymap + keycodes[backspace] = "backspace" + BaseEventQueue.__init__(self, encoding, keycodes) diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index e1ecd9845aefb4..6b72e7ba402ac0 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -42,6 +42,7 @@ from .console import Event, Console from .trace import trace from .utils import wlen +from .windows_eventqueue import EventQueue try: from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined] @@ -94,7 +95,9 @@ def __init__(self, err: int | None, descr: str | None = None) -> None: 0x83: "f20", # VK_F20 } -# Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences +# Virtual terminal output sequences +# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#output-sequences +# Check `windows_eventqueue.py` for input sequences ERASE_IN_LINE = "\x1b[K" MOVE_LEFT = "\x1b[{}D" MOVE_RIGHT = "\x1b[{}C" @@ -110,6 +113,12 @@ def __init__(self, err: int | None, descr: str | None = None) -> None: class _error(Exception): pass +def _supports_vt(): + try: + import nt + return nt._supports_virtual_terminal() + except (ImportError, AttributeError): + return False class WindowsConsole(Console): def __init__( @@ -121,17 +130,29 @@ def __init__( ): super().__init__(f_in, f_out, term, encoding) + self.__vt_support = _supports_vt() + + if self.__vt_support: + trace('console supports virtual terminal') + + # Save original console modes so we can recover on cleanup. + original_input_mode = DWORD() + GetConsoleMode(InHandle, original_input_mode) + trace(f'saved original input mode 0x{original_input_mode.value:x}') + self.__original_input_mode = original_input_mode.value + SetConsoleMode( OutHandle, ENABLE_WRAP_AT_EOL_OUTPUT | ENABLE_PROCESSED_OUTPUT | ENABLE_VIRTUAL_TERMINAL_PROCESSING, ) + self.screen: list[str] = [] self.width = 80 self.height = 25 self.__offset = 0 - self.event_queue: deque[Event] = deque() + self.event_queue = EventQueue(encoding) try: self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined] except ValueError: @@ -295,6 +316,12 @@ def _enable_blinking(self): def _disable_blinking(self): self.__write("\x1b[?12l") + def _enable_bracketed_paste(self) -> None: + self.__write("\x1b[?2004h") + + def _disable_bracketed_paste(self) -> None: + self.__write("\x1b[?2004l") + def __write(self, text: str) -> None: if "\x1a" in text: text = ''.join(["^Z" if x == '\x1a' else x for x in text]) @@ -324,8 +351,15 @@ def prepare(self) -> None: self.__gone_tall = 0 self.__offset = 0 + if self.__vt_support: + SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT) + self._enable_bracketed_paste() + def restore(self) -> None: - pass + if self.__vt_support: + # Recover to original mode before running REPL + self._disable_bracketed_paste() + SetConsoleMode(InHandle, self.__original_input_mode) def _move_relative(self, x: int, y: int) -> None: """Moves relative to the current posxy""" @@ -346,7 +380,7 @@ def move_cursor(self, x: int, y: int) -> None: raise ValueError(f"Bad cursor position {x}, {y}") if y < self.__offset or y >= self.__offset + self.height: - self.event_queue.insert(0, Event("scroll", "")) + self.event_queue.insert(Event("scroll", "")) else: self._move_relative(x, y) self.posxy = x, y @@ -376,72 +410,81 @@ def _getscrollbacksize(self) -> int: return info.srWindow.Bottom # type: ignore[no-any-return] def _read_input(self, block: bool = True) -> INPUT_RECORD | None: + # Create a buffer for 128 events + buffer = (INPUT_RECORD * 32)() + num_events_read = DWORD(0) + + # Use PeekConsoleInput to check for input without blocking if not block: - events = DWORD() - if not GetNumberOfConsoleInputEvents(InHandle, events): + count = DWORD() + if not PeekConsoleInput(InHandle, buffer, 32, ctypes.byref(count)): raise WinError(GetLastError()) - if not events.value: - return None + if count.value == 0: + return [] - rec = INPUT_RECORD() - read = DWORD() - if not ReadConsoleInput(InHandle, rec, 1, read): + # Read up to 1024 events at once + if not ReadConsoleInput(InHandle, buffer, 32, ctypes.byref(num_events_read)): raise WinError(GetLastError()) - - return rec + # print(num_events_read.value) + return list(buffer[:num_events_read.value]) def get_event(self, block: bool = True) -> Event | None: """Return an Event instance. Returns None if |block| is false and there is no event pending, otherwise waits for the completion of an event.""" - if self.event_queue: - return self.event_queue.pop() - while True: - rec = self._read_input(block) - if rec is None: - return None + while self.event_queue.empty(): + recs = self._read_input(block) + for rec in recs: + if rec == []: + continue - if rec.EventType == WINDOW_BUFFER_SIZE_EVENT: - return Event("resize", "") + if rec.EventType == WINDOW_BUFFER_SIZE_EVENT: + self.event_queue.insert(Event("resize", "")) + continue - if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown: - # Only process keys and keydown events - if block: + if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown: + # Only process keys and keydown events continue - return None - - key_event = rec.Event.KeyEvent - raw_key = key = key_event.uChar.UnicodeChar - - if key == "\r": - # Make enter unix-like - return Event(evt="key", data="\n", raw=b"\n") - elif key_event.wVirtualKeyCode == 8: - # Turn backspace directly into the command - key = "backspace" - elif key == "\x00": - # Handle special keys like arrow keys and translate them into the appropriate command - key = VK_MAP.get(key_event.wVirtualKeyCode) - if key: - if key_event.dwControlKeyState & CTRL_ACTIVE: - key = f"ctrl {key}" - elif key_event.dwControlKeyState & ALT_ACTIVE: - # queue the key, return the meta command - self.event_queue.insert(0, Event(evt="key", data=key, raw=key)) - return Event(evt="key", data="\033") # keymap.py uses this for meta - return Event(evt="key", data=key, raw=key) - if block: + + key_event = rec.Event.KeyEvent + raw_key = key = key_event.uChar.UnicodeChar + + if key == "\r": + # Make enter unix-like + self.event_queue.insert(Event(evt="key", data="\n", raw=b"\n")) continue + elif key_event.wVirtualKeyCode == 8: + # Turn backspace directly into the command + key = "backspace" + elif key == "\x00": + # Handle special keys like arrow keys and translate them into the appropriate command + key = VK_MAP.get(key_event.wVirtualKeyCode) + if key: + if key_event.dwControlKeyState & CTRL_ACTIVE: + key = f"ctrl {key}" + elif key_event.dwControlKeyState & ALT_ACTIVE: + # queue the key, return the meta command + self.event_queue.insert(Event(evt="key", data=key, raw=key)) + self.event_queue.insert(Event(evt="key", data="\033")) # keymap.py uses this for meta + continue + if block: + continue - return None + continue + elif self.__vt_support: + # If virtual terminal is enabled, scanning VT sequences + self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar) + continue - if key_event.dwControlKeyState & ALT_ACTIVE: - # queue the key, return the meta command - self.event_queue.insert(0, Event(evt="key", data=key, raw=raw_key)) - return Event(evt="key", data="\033") # keymap.py uses this for meta + if key_event.dwControlKeyState & ALT_ACTIVE: + # queue the key, return the meta command + self.event_queue.insert(Event(evt="key", data=key, raw=raw_key)) + self.event_queue.insert(Event(evt="key", data="\033")) # keymap.py uses this for meta + continue - return Event(evt="key", data=key, raw=raw_key) + self.event_queue.insert(Event(evt="key", data=key, raw=raw_key)) + return [self.event_queue.get() for x in range(min(len(self.event_queue.events), 100))] def push_char(self, char: int | bytes) -> None: """ @@ -563,6 +606,13 @@ class INPUT_RECORD(Structure): MOUSE_EVENT = 0x02 WINDOW_BUFFER_SIZE_EVENT = 0x04 +ENABLE_PROCESSED_INPUT = 0x0001 +ENABLE_LINE_INPUT = 0x0002 +ENABLE_ECHO_INPUT = 0x0004 +ENABLE_MOUSE_INPUT = 0x0010 +ENABLE_INSERT_MODE = 0x0020 +ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200 + ENABLE_PROCESSED_OUTPUT = 0x01 ENABLE_WRAP_AT_EOL_OUTPUT = 0x02 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04 @@ -594,6 +644,10 @@ class INPUT_RECORD(Structure): ] ScrollConsoleScreenBuffer.restype = BOOL + GetConsoleMode = _KERNEL32.GetConsoleMode + GetConsoleMode.argtypes = [HANDLE, POINTER(DWORD)] + GetConsoleMode.restype = BOOL + SetConsoleMode = _KERNEL32.SetConsoleMode SetConsoleMode.argtypes = [HANDLE, DWORD] SetConsoleMode.restype = BOOL @@ -602,6 +656,12 @@ class INPUT_RECORD(Structure): ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, POINTER(DWORD)] ReadConsoleInput.restype = BOOL + # Define PeekConsoleInput + LPDWORD = ctypes.POINTER(DWORD) + PeekConsoleInput = _KERNEL32.PeekConsoleInputW + PeekConsoleInput.argtypes = [HANDLE, ctypes.POINTER(INPUT_RECORD), DWORD, LPDWORD] + PeekConsoleInput.restype = BOOL + GetNumberOfConsoleInputEvents = _KERNEL32.GetNumberOfConsoleInputEvents GetNumberOfConsoleInputEvents.argtypes = [HANDLE, POINTER(DWORD)] GetNumberOfConsoleInputEvents.restype = BOOL @@ -620,6 +680,7 @@ def _win_only(*args, **kwargs): GetStdHandle = _win_only GetConsoleScreenBufferInfo = _win_only ScrollConsoleScreenBuffer = _win_only + GetConsoleMode = _win_only SetConsoleMode = _win_only ReadConsoleInput = _win_only GetNumberOfConsoleInputEvents = _win_only diff --git a/Lib/_pyrepl/windows_eventqueue.py b/Lib/_pyrepl/windows_eventqueue.py new file mode 100644 index 00000000000000..d99722f9a16a93 --- /dev/null +++ b/Lib/_pyrepl/windows_eventqueue.py @@ -0,0 +1,42 @@ +""" +Windows event and VT sequence scanner +""" + +from .base_eventqueue import BaseEventQueue + + +# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#input-sequences +VT_MAP: dict[bytes, str] = { + b'\x1b[A': 'up', + b'\x1b[B': 'down', + b'\x1b[C': 'right', + b'\x1b[D': 'left', + b'\x1b[1;5D': 'ctrl left', + b'\x1b[1;5C': 'ctrl right', + + b'\x1b[H': 'home', + b'\x1b[F': 'end', + + b'\x7f': 'backspace', + b'\x1b[2~': 'insert', + b'\x1b[3~': 'delete', + b'\x1b[5~': 'page up', + b'\x1b[6~': 'page down', + + b'\x1bOP': 'f1', + b'\x1bOQ': 'f2', + b'\x1bOR': 'f3', + b'\x1bOS': 'f4', + b'\x1b[15~': 'f5', + b'\x1b[17~': 'f6', + b'\x1b[18~': 'f7', + b'\x1b[19~': 'f8', + b'\x1b[20~': 'f9', + b'\x1b[21~': 'f10', + b'\x1b[23~': 'f11', + b'\x1b[24~': 'f12', +} + +class EventQueue(BaseEventQueue): + def __init__(self, encoding: str) -> None: + BaseEventQueue.__init__(self, encoding, VT_MAP) diff --git a/Lib/test/test_pyrepl/stdin_tty.py b/Lib/test/test_pyrepl/stdin_tty.py new file mode 100644 index 00000000000000..48f6884a813b34 --- /dev/null +++ b/Lib/test/test_pyrepl/stdin_tty.py @@ -0,0 +1,6 @@ +import subprocess +import sys + + +import code +code.InteractiveInterpreter().runsource("\0") diff --git a/Lib/test/test_pyrepl/support.py b/Lib/test/test_pyrepl/support.py index 45e3bf758f17de..6688cb1590271d 100644 --- a/Lib/test/test_pyrepl/support.py +++ b/Lib/test/test_pyrepl/support.py @@ -72,7 +72,7 @@ def get_prompt(lineno, cursor_on_line) -> str: def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console: console = MagicMock() - console.get_event.side_effect = events + console.get_event.side_effect = list(events) console.height = 100 console.width = 80 for key, val in kwargs.items(): @@ -83,7 +83,7 @@ def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console: def handle_all_events( events, prepare_console=prepare_console, prepare_reader=prepare_reader ): - console = prepare_console(events) + console = prepare_console([events]) reader = prepare_reader(console) try: while True: @@ -103,14 +103,21 @@ def handle_all_events( class FakeConsole(Console): def __init__(self, events, encoding="utf-8") -> None: - self.events = iter(events) + self.events = list(events) self.encoding = encoding self.screen = [] self.height = 100 self.width = 80 + self.index = 0 + self.events_len = len(self.events) def get_event(self, block: bool = True) -> Event | None: - return next(self.events) + if self.index < self.events_len: + index = self.index + self.index += 1 + return [self.events[index]] + else: + raise StopIteration def getpending(self) -> Event: return self.get_event(block=False) diff --git a/Lib/test/test_pyrepl/test_unix_eventqueue.py b/Lib/test/test_pyrepl/test_eventqueue.py similarity index 68% rename from Lib/test/test_pyrepl/test_unix_eventqueue.py rename to Lib/test/test_pyrepl/test_eventqueue.py index 301f79927a741f..a1bac38fbd43f5 100644 --- a/Lib/test/test_pyrepl/test_unix_eventqueue.py +++ b/Lib/test/test_pyrepl/test_eventqueue.py @@ -2,70 +2,77 @@ import unittest import sys from unittest.mock import patch +from test import support try: from _pyrepl.console import Event - from _pyrepl.unix_eventqueue import EventQueue + from _pyrepl import base_eventqueue except ImportError: pass -@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows") -@patch("_pyrepl.curses.tigetstr", lambda x: b"") -class TestUnixEventQueue(unittest.TestCase): - def setUp(self): - self.file = tempfile.TemporaryFile() +try: + from _pyrepl import unix_eventqueue +except ImportError: + pass - def tearDown(self) -> None: - self.file.close() +try: + from _pyrepl import windows_eventqueue +except ImportError: + pass + +class EventQueueTestBase: + """OS-independent mixin""" + def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: + raise NotImplementedError() def test_get(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() event = Event("key", "a", b"a") eq.insert(event) self.assertEqual(eq.get(), event) def test_empty(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() self.assertTrue(eq.empty()) eq.insert(Event("key", "a", b"a")) self.assertFalse(eq.empty()) def test_flush_buf(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.buf.extend(b"test") self.assertEqual(eq.flush_buf(), b"test") self.assertEqual(eq.buf, bytearray()) def test_insert(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() event = Event("key", "a", b"a") eq.insert(event) self.assertEqual(eq.events[0], event) - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_with_key_in_keymap(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"a": "b"} eq.push("a") mock_keymap.compile_keymap.assert_called() self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "b") - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_without_key_in_keymap(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"c": "d"} eq.push("a") mock_keymap.compile_keymap.assert_called() self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "a") - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_with_keymap_in_keymap(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"a": {b"b": "c"}} eq.push("a") mock_keymap.compile_keymap.assert_called() @@ -77,10 +84,10 @@ def test_push_with_keymap_in_keymap(self, mock_keymap): self.assertEqual(eq.events[1].evt, "key") self.assertEqual(eq.events[1].data, "d") - @patch("_pyrepl.unix_eventqueue.keymap") + @patch("_pyrepl.base_eventqueue.keymap") def test_push_with_keymap_in_keymap_and_escape(self, mock_keymap): mock_keymap.compile_keymap.return_value = {"a": "b"} - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {b"a": {b"b": "c"}} eq.push("a") mock_keymap.compile_keymap.assert_called() @@ -94,7 +101,7 @@ def test_push_with_keymap_in_keymap_and_escape(self, mock_keymap): self.assertEqual(eq.events[1].data, "b") def test_push_special_key(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {} eq.push("\x1b") eq.push("[") @@ -103,7 +110,7 @@ def test_push_special_key(self): self.assertEqual(eq.events[0].data, "\x1b") def test_push_unrecognized_escape_sequence(self): - eq = EventQueue(self.file.fileno(), "utf-8") + eq = self.make_eventqueue() eq.keymap = {} eq.push("\x1b") eq.push("[") @@ -115,3 +122,22 @@ def test_push_unrecognized_escape_sequence(self): self.assertEqual(eq.events[1].data, "[") self.assertEqual(eq.events[2].evt, "key") self.assertEqual(eq.events[2].data, "Z") + + +@unittest.skipIf(support.MS_WINDOWS, "No Unix event queue on Windows") +class TestUnixEventQueue(EventQueueTestBase, unittest.TestCase): + def setUp(self): + self.enterContext(patch("_pyrepl.curses.tigetstr", lambda x: b"")) + self.file = tempfile.TemporaryFile() + + def tearDown(self) -> None: + self.file.close() + + def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: + return unix_eventqueue.EventQueue(self.file.fileno(), "utf-8") + + +@unittest.skipUnless(support.MS_WINDOWS, "No Windows event queue on Unix") +class TestWindowsEventQueue(EventQueueTestBase, unittest.TestCase): + def make_eventqueue(self) -> base_eventqueue.BaseEventQueue: + return windows_eventqueue.EventQueue("utf-8") diff --git a/Lib/test/test_pyrepl/test_windows_console.py b/Lib/test/test_pyrepl/test_windows_console.py index 07eaccd1124cd6..33de077423b750 100644 --- a/Lib/test/test_pyrepl/test_windows_console.py +++ b/Lib/test/test_pyrepl/test_windows_console.py @@ -30,7 +30,7 @@ class WindowsConsoleTests(TestCase): def console(self, events, **kwargs) -> Console: console = WindowsConsole() - console.get_event = MagicMock(side_effect=events) + console.get_event = MagicMock(side_effect=list(events)) console._scroll = MagicMock() console._hide_cursor = MagicMock() console._show_cursor = MagicMock() diff --git a/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst new file mode 100644 index 00000000000000..2a6aed98c55374 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-09-16-17-03-52.gh-issue-124096.znin0O.rst @@ -0,0 +1,3 @@ +Turn on virtual terminal mode and enable bracketed paste in REPL on Windows +console. (If the terminal does not support bracketed paste, enabling it +does nothing.) From 2b1e338ee5751fa3099ce2d9ca9b1d1a9b2ec399 Mon Sep 17 00:00:00 2001 From: devdanzin <74280297+devdanzin@users.noreply.github.com> Date: Mon, 24 Feb 2025 10:02:13 -0300 Subject: [PATCH 2/2] Remove file that was wrongly added. --- Lib/test/test_pyrepl/stdin_tty.py | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 Lib/test/test_pyrepl/stdin_tty.py diff --git a/Lib/test/test_pyrepl/stdin_tty.py b/Lib/test/test_pyrepl/stdin_tty.py deleted file mode 100644 index 48f6884a813b34..00000000000000 --- a/Lib/test/test_pyrepl/stdin_tty.py +++ /dev/null @@ -1,6 +0,0 @@ -import subprocess -import sys - - -import code -code.InteractiveInterpreter().runsource("\0")