|
4 | 4 | import sys
|
5 | 5 | import types
|
6 | 6 | import weakref
|
7 |
| -from typing import TYPE_CHECKING, Protocol, TypeVar |
| 7 | +from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeVar |
8 | 8 |
|
9 | 9 | import attrs
|
10 | 10 |
|
|
14 | 14 | import types
|
15 | 15 | from collections.abc import Callable
|
16 | 16 |
|
17 |
| - from typing_extensions import TypeGuard |
| 17 | + from typing_extensions import Self, TypeGuard |
18 | 18 | # In ordinary single-threaded Python code, when you hit control-C, it raises
|
19 | 19 | # an exception and automatically does all the regular unwinding stuff.
|
20 | 20 | #
|
|
77 | 77 | # for any Python program that's written to catch and ignore
|
78 | 78 | # KeyboardInterrupt.)
|
79 | 79 |
|
80 |
| -_CODE_KI_PROTECTION_STATUS_WMAP: weakref.WeakKeyDictionary[ |
| 80 | +_T = TypeVar("_T") |
| 81 | + |
| 82 | + |
| 83 | +class _IdRef(weakref.ref[_T]): |
| 84 | + slots = "_hash" |
| 85 | + _hash: int |
| 86 | + |
| 87 | + def __new__(cls, ob: _T, callback: Callable[[Self], Any] | None = None, /) -> Self: |
| 88 | + self: Self = weakref.ref.__new__(cls, ob, callback) |
| 89 | + self._hash = object.__hash__(ob) |
| 90 | + return self |
| 91 | + |
| 92 | + def __eq__(self, other: object) -> bool: |
| 93 | + if self is other: |
| 94 | + return True |
| 95 | + |
| 96 | + if not isinstance(other, _IdRef): |
| 97 | + return NotImplemented |
| 98 | + |
| 99 | + my_obj = None |
| 100 | + other_obj: Any = None |
| 101 | + try: |
| 102 | + my_obj = self() |
| 103 | + other_obj = other() |
| 104 | + return my_obj is not None and other_obj is not None and my_obj is other_obj |
| 105 | + finally: |
| 106 | + del my_obj, other_obj |
| 107 | + |
| 108 | + def __hash__(self) -> int: |
| 109 | + return self._hash |
| 110 | + |
| 111 | + |
| 112 | +_KT = TypeVar("_KT") |
| 113 | +_VT = TypeVar("_VT") |
| 114 | + |
| 115 | + |
| 116 | +# see also: https://github.com/python/cpython/issues/88306 |
| 117 | +class WeakKeyIdentityDictionary(Generic[_KT, _VT]): |
| 118 | + def __init__(self) -> None: |
| 119 | + self._data: dict[_IdRef[_KT], _VT] = {} |
| 120 | + |
| 121 | + def remove( |
| 122 | + k: _IdRef[_KT], |
| 123 | + selfref: weakref.ref[ |
| 124 | + WeakKeyIdentityDictionary[_KT, _VT] |
| 125 | + ] = weakref.ref( # noqa: B008 # function-call-in-default-argument |
| 126 | + self, |
| 127 | + ), |
| 128 | + ) -> None: |
| 129 | + self = selfref() |
| 130 | + if self is not None: |
| 131 | + try: # noqa: SIM105 # supressible-exception |
| 132 | + del self._data[k] |
| 133 | + except KeyError: |
| 134 | + pass |
| 135 | + |
| 136 | + self._remove = remove |
| 137 | + |
| 138 | + def __getitem__(self, k: _KT) -> _VT: |
| 139 | + return self._data[_IdRef(k)] |
| 140 | + |
| 141 | + def __setitem__(self, k: _KT, v: _VT) -> None: |
| 142 | + self._data[_IdRef(k, self._remove)] = v |
| 143 | + |
| 144 | + |
| 145 | +_CODE_KI_PROTECTION_STATUS_WMAP: WeakKeyIdentityDictionary[ |
81 | 146 | types.CodeType,
|
82 | 147 | bool,
|
83 |
| -] = weakref.WeakKeyDictionary() |
| 148 | +] = WeakKeyIdentityDictionary() |
84 | 149 |
|
85 | 150 |
|
86 | 151 | # This is to support the async_generator package necessary for aclosing on <3.10
|
|
0 commit comments