Skip to content

gh-109413: Enable strict_optional = true for libregrtest/run_workers #126855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Lib/test/libregrtest/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ disallow_untyped_defs = False
check_untyped_defs = False
warn_return_any = False

# Enable --strict-optional for these ASAP:
[mypy-Lib.test.libregrtest.run_workers.*]
strict_optional = False

# Various internal modules that typeshed deliberately doesn't have stubs for:
[mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*]
ignore_missing_imports = True
39 changes: 25 additions & 14 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
self._killed = False
self._stopped = False

def current_test_name(self) -> TestName:
if self.test_name is None:
raise ValueError(
'Should never call `.current_test_name()` before calling `.run()`'
)
return self.test_name

def __repr__(self) -> str:
info = [f'WorkerThread #{self.worker_id}']
if self.is_alive():
Expand All @@ -127,9 +134,9 @@ def __repr__(self) -> str:
if test:
info.append(f'test={test}')
popen = self._popen
if popen is not None:
if popen is not None and self.start_time is not None:
dt = time.monotonic() - self.start_time
info.extend((f'pid={self._popen.pid}',
info.extend((f'pid={popen.pid}',
f'time={format_duration(dt)}'))
return '<%s>' % ' '.join(info)

Expand Down Expand Up @@ -311,13 +318,14 @@ def read_stdout(self, stdout_file: TextIO) -> str:
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
raise WorkerError(self.test_name,
raise WorkerError(self.current_test_name(),
f"Cannot read process stdout: {exc}",
stdout=None,
state=State.WORKER_BUG)

def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
stdout: str) -> tuple[TestResult, str]:
test_name = self.current_test_name()
try:
if json_tmpfile is not None:
json_tmpfile.seek(0)
Expand All @@ -332,11 +340,11 @@ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to read worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
raise WorkerError(test_name, err_msg, stdout,
state=State.WORKER_BUG)

if not worker_json:
raise WorkerError(self.test_name, "empty JSON", stdout,
raise WorkerError(test_name, "empty JSON", stdout,
state=State.WORKER_BUG)

try:
Expand All @@ -345,7 +353,7 @@ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to parse worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
raise WorkerError(test_name, err_msg, stdout,
state=State.WORKER_BUG)

return (result, stdout)
Expand All @@ -363,14 +371,14 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
stdout = self.read_stdout(stdout_file)

if retcode is None:
raise WorkerError(self.test_name, stdout=stdout,
raise WorkerError(test_name, stdout=stdout,
err_msg=None,
state=State.TIMEOUT)
if retcode != 0:
name = support.get_signal_name(retcode)
if name:
retcode = f"{retcode} ({name})"
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
raise WorkerError(test_name, f"Exit code {retcode}", stdout,
state=State.WORKER_FAILED)

result, stdout = self.read_json(json_file, json_tmpfile, stdout)
Expand All @@ -394,15 +402,15 @@ def run(self) -> None:
except StopIteration:
break

self.start_time = time.monotonic()
self.start_time = start_time = time.monotonic()
self.test_name = test_name
try:
mp_result = self._runtest(test_name)
except WorkerError as exc:
mp_result = exc.mp_result
finally:
self.test_name = None
mp_result.result.duration = time.monotonic() - self.start_time
mp_result.result.duration = time.monotonic() - start_time
self.output.put((False, mp_result))

if mp_result.result.must_stop(fail_fast, fail_env_changed):
Expand All @@ -416,6 +424,9 @@ def run(self) -> None:

def _wait_completed(self) -> None:
popen = self._popen
# only needed for mypy:
if popen is None:
raise ValueError("Should never access `._popen` before calling `.run()`")

try:
popen.wait(WAIT_COMPLETED_TIMEOUT)
Expand Down Expand Up @@ -451,7 +462,7 @@ def get_running(workers: list[WorkerThread]) -> str | None:
running: list[str] = []
for worker in workers:
test_name = worker.test_name
if not test_name:
if not test_name or worker.start_time is None:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
Expand Down Expand Up @@ -483,7 +494,7 @@ def __init__(self, num_workers: int, runtests: RunTests,
self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers: list[WorkerThread] | None = None
self.workers: list[WorkerThread] = []

jobs = self.runtests.get_jobs()
if jobs is not None:
Expand All @@ -503,7 +514,7 @@ def start_workers(self) -> None:
processes = plural(nworkers, "process", "processes")
msg = (f"Run {tests} in parallel using "
f"{nworkers} worker {processes}")
if self.timeout:
if self.timeout and self.worker_timeout is not None:
msg += (" (timeout: %s, worker timeout: %s)"
% (format_duration(self.timeout),
format_duration(self.worker_timeout)))
Expand Down Expand Up @@ -555,7 +566,7 @@ def display_result(self, mp_result: MultiprocessResult) -> None:
if mp_result.err_msg:
# WORKER_BUG
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
elif (result.duration and result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
if not pgo:
running = get_running(self.workers)
Expand Down
Loading