diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 70133b9839..10c1ddfdc0 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -393,6 +393,10 @@ Wait queue abstraction .. autoclass:: ParkingLotStatistics :members: +.. autofunction:: add_parking_lot_breaker + +.. autofunction:: remove_parking_lot_breaker + Low-level checkpoint functions ------------------------------ diff --git a/newsfragments/3035.feature.rst b/newsfragments/3035.feature.rst new file mode 100644 index 0000000000..a1761fa282 --- /dev/null +++ b/newsfragments/3035.feature.rst @@ -0,0 +1 @@ +:class:`trio.Lock` and :class:`trio.StrictFIFOLock` will now raise :exc:`trio.BrokenResourceError` when :meth:`trio.Lock.acquire` would previously stall due to the owner of the lock exiting without releasing the lock. diff --git a/newsfragments/3081.feature.rst b/newsfragments/3081.feature.rst new file mode 100644 index 0000000000..34a073b265 --- /dev/null +++ b/newsfragments/3081.feature.rst @@ -0,0 +1 @@ +Added :func:`trio.lowlevel.add_parking_lot_breaker` and :func:`trio.lowlevel.remove_parking_lot_breaker` to allow creating custom lock/semaphore implementations that will break their underlying parking lot if a task exits unexpectedly. :meth:`trio.lowlevel.ParkingLot.break_lot` is also added, to allow breaking a parking lot intentionally. diff --git a/src/trio/_core/__init__.py b/src/trio/_core/__init__.py index 71f5f17eb2..fdef90292d 100644 --- a/src/trio/_core/__init__.py +++ b/src/trio/_core/__init__.py @@ -20,7 +20,12 @@ from ._ki import currently_ki_protected, disable_ki_protection, enable_ki_protection from ._local import RunVar, RunVarToken from ._mock_clock import MockClock -from ._parking_lot import ParkingLot, ParkingLotStatistics +from ._parking_lot import ( + ParkingLot, + ParkingLotStatistics, + add_parking_lot_breaker, + remove_parking_lot_breaker, +) # Imports that always exist from ._run import ( diff --git a/src/trio/_core/_parking_lot.py b/src/trio/_core/_parking_lot.py index 916e6a6e96..af6ff610ee 100644 --- a/src/trio/_core/_parking_lot.py +++ b/src/trio/_core/_parking_lot.py @@ -71,11 +71,13 @@ # See: https://github.com/python-trio/trio/issues/53 from __future__ import annotations +import inspect import math from collections import OrderedDict from typing import TYPE_CHECKING import attrs +import outcome from .. import _core from .._util import final @@ -86,6 +88,37 @@ from ._run import Task +GLOBAL_PARKING_LOT_BREAKER: dict[Task, list[ParkingLot]] = {} + + +def add_parking_lot_breaker(task: Task, lot: ParkingLot) -> None: + """Register a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`. + + raises: + trio.BrokenResourceError: if the task has already exited. + """ + if inspect.getcoroutinestate(task.coro) == inspect.CORO_CLOSED: + raise _core._exceptions.BrokenResourceError( + "Attempted to add already exited task as lot breaker.", + ) + if task not in GLOBAL_PARKING_LOT_BREAKER: + GLOBAL_PARKING_LOT_BREAKER[task] = [lot] + else: + GLOBAL_PARKING_LOT_BREAKER[task].append(lot) + + +def remove_parking_lot_breaker(task: Task, lot: ParkingLot) -> None: + """Deregister a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`""" + try: + GLOBAL_PARKING_LOT_BREAKER[task].remove(lot) + except (KeyError, ValueError): + raise RuntimeError( + "Attempted to remove task as breaker for a lot it is not registered for", + ) from None + if not GLOBAL_PARKING_LOT_BREAKER[task]: + del GLOBAL_PARKING_LOT_BREAKER[task] + + @attrs.frozen class ParkingLotStatistics: """An object containing debugging information for a ParkingLot. @@ -118,6 +151,7 @@ class ParkingLot: # {task: None}, we just want a deque where we can quickly delete random # items _parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False) + broken_by: list[Task] = attrs.field(factory=list, init=False) def __len__(self) -> int: """Returns the number of parked tasks.""" @@ -136,7 +170,15 @@ async def park(self) -> None: """Park the current task until woken by a call to :meth:`unpark` or :meth:`unpark_all`. + Raises: + BrokenResourceError: if attempting to park in a broken lot, or the lot + breaks before we get to unpark. + """ + if self.broken_by: + raise _core.BrokenResourceError( + f"Attempted to park in parking lot broken by {self.broken_by}", + ) task = _core.current_task() self._parked[task] = None task.custom_sleep_data = self @@ -234,6 +276,35 @@ def repark_all(self, new_lot: ParkingLot) -> None: """ return self.repark(new_lot, count=len(self)) + def break_lot(self, task: Task | None = None) -> None: + """Break this lot, with ``task`` noted as the task that broke it. + + This causes all parked tasks to raise an error, and any + future tasks attempting to park to error. Unpark & repark become no-ops as the + parking lot is empty. + + The error raised contains a reference to the task sent as a parameter. The task + is also saved in the parking lot in the ``broken_by`` attribute. + """ + if task is None: + task = _core.current_task() + + # if lot is already broken, just mark this as another breaker and return + if self.broken_by: + self.broken_by.append(task) + return + + self.broken_by.append(task) + + for parked_task in self._parked: + _core.reschedule( + parked_task, + outcome.Error( + _core.BrokenResourceError(f"Parking lot broken by {task}"), + ), + ) + self._parked.clear() + def statistics(self) -> ParkingLotStatistics: """Return an object containing debugging information. diff --git a/src/trio/_core/_run.py b/src/trio/_core/_run.py index 5de75e8e6c..cba7a8dec0 100644 --- a/src/trio/_core/_run.py +++ b/src/trio/_core/_run.py @@ -40,6 +40,7 @@ from ._exceptions import Cancelled, RunFinishedError, TrioInternalError from ._instrumentation import Instruments from ._ki import LOCALS_KEY_KI_PROTECTION_ENABLED, KIManager, enable_ki_protection +from ._parking_lot import GLOBAL_PARKING_LOT_BREAKER from ._thread_cache import start_thread_soon from ._traps import ( Abort, @@ -1896,6 +1897,12 @@ async def python_wrapper(orig_coro: Awaitable[RetT]) -> RetT: return task def task_exited(self, task: Task, outcome: Outcome[Any]) -> None: + # break parking lots associated with the exiting task + if task in GLOBAL_PARKING_LOT_BREAKER: + for lot in GLOBAL_PARKING_LOT_BREAKER[task]: + lot.break_lot(task) + del GLOBAL_PARKING_LOT_BREAKER[task] + if ( task._cancel_status is not None and task._cancel_status.abandoned_by_misnesting diff --git a/src/trio/_core/_tests/test_parking_lot.py b/src/trio/_core/_tests/test_parking_lot.py index ed6a17012e..d9afee83d4 100644 --- a/src/trio/_core/_tests/test_parking_lot.py +++ b/src/trio/_core/_tests/test_parking_lot.py @@ -1,9 +1,18 @@ from __future__ import annotations +import re from typing import TypeVar import pytest +import trio +from trio.lowlevel import ( + add_parking_lot_breaker, + current_task, + remove_parking_lot_breaker, +) +from trio.testing import Matcher, RaisesGroup + from ... import _core from ...testing import wait_all_tasks_blocked from .._parking_lot import ParkingLot @@ -215,3 +224,161 @@ async def test_parking_lot_repark_with_count() -> None: "wake 2", ] lot1.unpark_all() + + +async def dummy_task( + task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED, +) -> None: + task_status.started(_core.current_task()) + await trio.sleep_forever() + + +async def test_parking_lot_breaker_basic() -> None: + """Test basic functionality for breaking lots.""" + lot = ParkingLot() + task = current_task() + + # defaults to current task + lot.break_lot() + assert lot.broken_by == [task] + + # breaking the lot again with the same task appends another copy in `broken_by` + lot.break_lot() + assert lot.broken_by == [task, task] + + # trying to park in broken lot errors + broken_by_str = re.escape(str([task, task])) + with pytest.raises( + _core.BrokenResourceError, + match=f"^Attempted to park in parking lot broken by {broken_by_str}$", + ): + await lot.park() + + +async def test_parking_lot_break_parking_tasks() -> None: + """Checks that tasks currently waiting to park raise an error when the breaker exits.""" + + async def bad_parker(lot: ParkingLot, scope: _core.CancelScope) -> None: + add_parking_lot_breaker(current_task(), lot) + with scope: + await trio.sleep_forever() + + lot = ParkingLot() + cs = _core.CancelScope() + + # check that parked task errors + with RaisesGroup( + Matcher(_core.BrokenResourceError, match="^Parking lot broken by"), + ): + async with _core.open_nursery() as nursery: + nursery.start_soon(bad_parker, lot, cs) + await wait_all_tasks_blocked() + + nursery.start_soon(lot.park) + await wait_all_tasks_blocked() + + cs.cancel() + + +async def test_parking_lot_breaker_registration() -> None: + lot = ParkingLot() + task = current_task() + + with pytest.raises( + RuntimeError, + match="Attempted to remove task as breaker for a lot it is not registered for", + ): + remove_parking_lot_breaker(task, lot) + + # check that a task can be registered as breaker for the same lot multiple times + add_parking_lot_breaker(task, lot) + add_parking_lot_breaker(task, lot) + remove_parking_lot_breaker(task, lot) + remove_parking_lot_breaker(task, lot) + + with pytest.raises( + RuntimeError, + match="Attempted to remove task as breaker for a lot it is not registered for", + ): + remove_parking_lot_breaker(task, lot) + + # registering a task as breaker on an already broken lot is fine + lot.break_lot() + child_task = None + async with trio.open_nursery() as nursery: + child_task = await nursery.start(dummy_task) + add_parking_lot_breaker(child_task, lot) + nursery.cancel_scope.cancel() + assert lot.broken_by == [task, child_task] + + # manually breaking a lot with an already exited task is fine + lot = ParkingLot() + lot.break_lot(child_task) + assert lot.broken_by == [child_task] + + +async def test_parking_lot_breaker_rebreak() -> None: + lot = ParkingLot() + task = current_task() + lot.break_lot() + + # breaking an already broken lot with a different task is allowed + # The nursery is only to create a task we can pass to lot.break_lot + async with trio.open_nursery() as nursery: + child_task = await nursery.start(dummy_task) + lot.break_lot(child_task) + nursery.cancel_scope.cancel() + + assert lot.broken_by == [task, child_task] + + +async def test_parking_lot_multiple_breakers_exit() -> None: + # register multiple tasks as lot breakers, then have them all exit + lot = ParkingLot() + async with trio.open_nursery() as nursery: + child_task1 = await nursery.start(dummy_task) + child_task2 = await nursery.start(dummy_task) + child_task3 = await nursery.start(dummy_task) + add_parking_lot_breaker(child_task1, lot) + add_parking_lot_breaker(child_task2, lot) + add_parking_lot_breaker(child_task3, lot) + nursery.cancel_scope.cancel() + + # I think the order is guaranteed currently, but doesn't hurt to be safe. + assert set(lot.broken_by) == {child_task1, child_task2, child_task3} + + +async def test_parking_lot_breaker_register_exited_task() -> None: + lot = ParkingLot() + child_task = None + async with trio.open_nursery() as nursery: + child_task = await nursery.start(dummy_task) + nursery.cancel_scope.cancel() + # trying to register an exited task as lot breaker errors + with pytest.raises( + trio.BrokenResourceError, + match="^Attempted to add already exited task as lot breaker.$", + ): + add_parking_lot_breaker(child_task, lot) + + +async def test_parking_lot_break_itself() -> None: + """Break a parking lot, where the breakee is parked. + Doing this is weird, but should probably be supported. + """ + + async def return_me_and_park( + lot: ParkingLot, + *, + task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED, + ) -> None: + task_status.started(_core.current_task()) + await lot.park() + + lot = ParkingLot() + with RaisesGroup( + Matcher(_core.BrokenResourceError, match="^Parking lot broken by"), + ): + async with _core.open_nursery() as nursery: + child_task = await nursery.start(return_me_and_park, lot) + lot.break_lot(child_task) diff --git a/src/trio/_sync.py b/src/trio/_sync.py index 698716ea35..03d518aab9 100644 --- a/src/trio/_sync.py +++ b/src/trio/_sync.py @@ -8,7 +8,14 @@ import trio from . import _core -from ._core import Abort, ParkingLot, RaiseCancelT, enable_ki_protection +from ._core import ( + Abort, + ParkingLot, + RaiseCancelT, + add_parking_lot_breaker, + enable_ki_protection, + remove_parking_lot_breaker, +) from ._util import final if TYPE_CHECKING: @@ -576,20 +583,30 @@ def acquire_nowait(self) -> None: elif self._owner is None and not self._lot: # No-one owns it self._owner = task + add_parking_lot_breaker(task, self._lot) else: raise trio.WouldBlock @enable_ki_protection async def acquire(self) -> None: - """Acquire the lock, blocking if necessary.""" + """Acquire the lock, blocking if necessary. + + Raises: + BrokenResourceError: if the owner of the lock exits without releasing. + """ await trio.lowlevel.checkpoint_if_cancelled() try: self.acquire_nowait() except trio.WouldBlock: - # NOTE: it's important that the contended acquire path is just - # "_lot.park()", because that's how Condition.wait() acquires the - # lock as well. - await self._lot.park() + try: + # NOTE: it's important that the contended acquire path is just + # "_lot.park()", because that's how Condition.wait() acquires the + # lock as well. + await self._lot.park() + except trio.BrokenResourceError: + raise trio.BrokenResourceError( + f"Owner of this lock exited without releasing: {self._owner}", + ) from None else: await trio.lowlevel.cancel_shielded_checkpoint() @@ -604,8 +621,10 @@ def release(self) -> None: task = trio.lowlevel.current_task() if task is not self._owner: raise RuntimeError("can't release a Lock you don't own") + remove_parking_lot_breaker(self._owner, self._lot) if self._lot: (self._owner,) = self._lot.unpark(count=1) + add_parking_lot_breaker(self._owner, self._lot) else: self._owner = None @@ -767,7 +786,11 @@ def acquire_nowait(self) -> None: return self._lock.acquire_nowait() async def acquire(self) -> None: - """Acquire the underlying lock, blocking if necessary.""" + """Acquire the underlying lock, blocking if necessary. + + Raises: + BrokenResourceError: if the owner of the underlying lock exits without releasing. + """ await self._lock.acquire() def release(self) -> None: @@ -796,6 +819,7 @@ async def wait(self) -> None: Raises: RuntimeError: if the calling task does not hold the lock. + BrokenResourceError: if the owner of the lock exits without releasing, when attempting to re-acquire. """ if trio.lowlevel.current_task() is not self._lock._owner: diff --git a/src/trio/_tests/test_sync.py b/src/trio/_tests/test_sync.py index caf3f04f5b..f506e84ffc 100644 --- a/src/trio/_tests/test_sync.py +++ b/src/trio/_tests/test_sync.py @@ -1,11 +1,15 @@ from __future__ import annotations +import re import weakref from typing import TYPE_CHECKING, Callable, Union import pytest +from trio.testing import Matcher, RaisesGroup + from .. import _core +from .._core._parking_lot import GLOBAL_PARKING_LOT_BREAKER from .._sync import * from .._timeouts import sleep_forever from ..testing import assert_checkpoints, wait_all_tasks_blocked @@ -586,3 +590,66 @@ async def lock_taker() -> None: await wait_all_tasks_blocked() assert record == ["started"] lock_like.release() + + +async def test_lock_acquire_unowned_lock() -> None: + """Test that trying to acquire a lock whose owner has exited raises an error. + see https://github.com/python-trio/trio/issues/3035 + """ + assert not GLOBAL_PARKING_LOT_BREAKER + lock = trio.Lock() + async with trio.open_nursery() as nursery: + nursery.start_soon(lock.acquire) + owner_str = re.escape(str(lock._lot.broken_by[0])) + with pytest.raises( + trio.BrokenResourceError, + match=f"^Owner of this lock exited without releasing: {owner_str}$", + ): + await lock.acquire() + assert not GLOBAL_PARKING_LOT_BREAKER + + +async def test_lock_multiple_acquire() -> None: + """Test for error if awaiting on a lock whose owner exits without releasing. + see https://github.com/python-trio/trio/issues/3035""" + assert not GLOBAL_PARKING_LOT_BREAKER + lock = trio.Lock() + with RaisesGroup( + Matcher( + trio.BrokenResourceError, + match="^Owner of this lock exited without releasing: ", + ), + ): + async with trio.open_nursery() as nursery: + nursery.start_soon(lock.acquire) + nursery.start_soon(lock.acquire) + assert not GLOBAL_PARKING_LOT_BREAKER + + +async def test_lock_handover() -> None: + assert not GLOBAL_PARKING_LOT_BREAKER + child_task: Task | None = None + lock = trio.Lock() + + # this task acquires the lock + lock.acquire_nowait() + assert GLOBAL_PARKING_LOT_BREAKER == { + _core.current_task(): [ + lock._lot, + ], + } + + async with trio.open_nursery() as nursery: + nursery.start_soon(lock.acquire) + await wait_all_tasks_blocked() + + # hand over the lock to the child task + lock.release() + + # check values, and get the identifier out of the dict for later check + assert len(GLOBAL_PARKING_LOT_BREAKER) == 1 + child_task = next(iter(GLOBAL_PARKING_LOT_BREAKER)) + assert GLOBAL_PARKING_LOT_BREAKER[child_task] == [lock._lot] + + assert lock._lot.broken_by == [child_task] + assert not GLOBAL_PARKING_LOT_BREAKER diff --git a/src/trio/lowlevel.py b/src/trio/lowlevel.py index 1df7019637..9e385a0045 100644 --- a/src/trio/lowlevel.py +++ b/src/trio/lowlevel.py @@ -25,6 +25,7 @@ UnboundedQueue as UnboundedQueue, UnboundedQueueStatistics as UnboundedQueueStatistics, add_instrument as add_instrument, + add_parking_lot_breaker as add_parking_lot_breaker, cancel_shielded_checkpoint as cancel_shielded_checkpoint, checkpoint as checkpoint, checkpoint_if_cancelled as checkpoint_if_cancelled, @@ -40,6 +41,7 @@ permanently_detach_coroutine_object as permanently_detach_coroutine_object, reattach_detached_coroutine_object as reattach_detached_coroutine_object, remove_instrument as remove_instrument, + remove_parking_lot_breaker as remove_parking_lot_breaker, reschedule as reschedule, spawn_system_task as spawn_system_task, start_guest_run as start_guest_run,