|
8 | 8 | import atexit
|
9 | 9 | import os
|
10 | 10 |
|
11 |
| -from typing import NoReturn, TYPE_CHECKING |
| 11 | +from typing import Callable, NoReturn, TYPE_CHECKING |
12 | 12 |
|
13 | 13 | from coverage.exceptions import ConfigError
|
14 | 14 | from coverage.files import create_pth_file
|
|
17 | 17 | from coverage import Coverage
|
18 | 18 | from coverage.config import CoverageConfig
|
19 | 19 |
|
20 |
| -_old_os_exit = os._exit |
21 | 20 |
|
22 | 21 | def apply_patches(cov: Coverage, config: CoverageConfig) -> None:
|
23 | 22 | """Apply invasive patches requested by `[run] patch=`."""
|
24 | 23 |
|
25 | 24 | for patch in set(config.patch):
|
26 | 25 | if patch == "_exit":
|
27 |
| - def _coverage_os_exit_patch(status: int) -> NoReturn: |
28 |
| - try: |
29 |
| - cov.save() |
30 |
| - except: # pylint: disable=bare-except |
31 |
| - pass |
32 |
| - _old_os_exit(status) |
33 |
| - os._exit = _coverage_os_exit_patch |
| 26 | + def make_patch(old_os_exit: Callable[[int], NoReturn]) -> Callable[[int], NoReturn]: |
| 27 | + def _coverage_os_exit_patch(status: int) -> NoReturn: |
| 28 | + try: |
| 29 | + cov.save() |
| 30 | + except: # pylint: disable=bare-except |
| 31 | + pass |
| 32 | + old_os_exit(status) |
| 33 | + return _coverage_os_exit_patch |
| 34 | + os._exit = make_patch(os._exit) # type: ignore[assignment] |
| 35 | + |
34 | 36 | elif patch == "subprocess":
|
35 | 37 | pth_file = create_pth_file()
|
36 | 38 | assert pth_file is not None
|
37 | 39 | atexit.register(pth_file.unlink, missing_ok=True)
|
38 | 40 | assert config.config_file is not None
|
39 | 41 | os.environ["COVERAGE_PROCESS_START"] = config.config_file
|
| 42 | + |
40 | 43 | else:
|
41 | 44 | raise ConfigError(f"Unknown patch {patch!r}")
|
0 commit comments