Skip to content

Commit 69f41c8

Browse files
authored
1.3.2
2 parents 63982ba + f934d22 commit 69f41c8

File tree

12 files changed

+403
-325
lines changed

12 files changed

+403
-325
lines changed

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
66

7+
## [1.3.2] 2023-01-30
8+
### Improvements
9+
- replaced some subprocess calls executed in behalf of non-root users by async calls (Python's asyncio native approach)
10+
11+
### Fixes
12+
- the optimizer service (as root) not able to execute some commands in behalf of non-root users (started with Python 3.10.9)
13+
714
## [1.3.1] 2022-10-22
815
### Improvements
916
- Optimizing the children of the target process

LICENSE

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Copyright (c) 2021-2023 Vinícius Moreira
1+
Copyright (c) 2021 Vinícius Moreira
22

33
zlib/libpng license
44

guapow/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
44
__app_name__ = 'guapow'
5-
__version__ = '1.3.1'
5+
__version__ = '1.3.2'

guapow/common/scripts.py

+34-65
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
import asyncio
2-
import datetime
31
import os
4-
import subprocess
52
from logging import Logger
6-
from multiprocessing import Process
73
from typing import Optional, List, Dict, Set
84

9-
from guapow.common import system
105
from guapow.common.model import ScriptSettings
11-
from guapow.common.system import run_user_command
6+
from guapow.common.system import run_async_process, ProcessTimedOutError
127
from guapow.common.users import is_root_user
138

149

@@ -19,31 +14,6 @@ def __init__(self, name: str, root_allowed: bool, logger: Logger):
1914
self._log = logger
2015
self.root_allowed = root_allowed
2116

22-
async def _execute_user_scripts(self, settings: ScriptSettings, user_id: int, user_env: Optional[Dict[str, str]]) -> Set[int]:
23-
pids = set()
24-
25-
for cmd in settings.scripts:
26-
res = system.new_user_process_response()
27-
28-
self._log.info(f"Running {self._name} script '{cmd}' (user={user_id})")
29-
p = Process(daemon=True, target=run_user_command, args=(cmd, user_id, settings.wait_execution, settings.timeout, user_env, res))
30-
p.start()
31-
32-
if settings.wait_execution or (settings.timeout is not None and settings.timeout > 0):
33-
self._log.info(f"Waiting {self._name} script '{cmd}' to finish (user={user_id})")
34-
35-
while p.is_alive(): # to allow other async tasks to execute
36-
await asyncio.sleep(0.0005)
37-
38-
self._log.info(f"{self._name.capitalize()} script '{cmd}' finished (user={user_id})")
39-
40-
p.join()
41-
42-
if res['pid'] is not None:
43-
pids.add(res['pid'])
44-
45-
return pids
46-
4717
@staticmethod
4818
def get_environ(env: Optional[Dict[str, str]] = None) -> Dict[str, str]:
4919
final_env = env if env else dict(os.environ)
@@ -53,42 +23,40 @@ def get_environ(env: Optional[Dict[str, str]] = None) -> Dict[str, str]:
5323

5424
return final_env
5525

56-
async def _execute_scripts(self, settings: ScriptSettings, user_env: Optional[Dict[str, str]]) -> Set[int]:
26+
async def _execute_scripts(self, settings: ScriptSettings, user_id: Optional[int] = None,
27+
user_env: Optional[Dict[str, str]] = None) -> Set[int]:
5728
pids = set()
5829
env = self.get_environ(user_env)
5930

31+
valid_timeout = settings.has_valid_timeout()
32+
33+
if not valid_timeout and settings.timeout is not None:
34+
self._log.warning(f"Invalid {self._name} scripts timeout defined: {settings.timeout}. "
35+
f"No script will be awaited")
36+
37+
should_wait = settings.wait_execution or valid_timeout
38+
6039
for cmd in settings.scripts:
61-
p = await asyncio.create_subprocess_shell(cmd=cmd, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
62-
stderr=subprocess.DEVNULL, env=env)
63-
pids.add(p.pid)
64-
self._log.info(f"Started {self._name} script: {cmd} (pid={p.pid})")
65-
66-
if settings.timeout is not None:
67-
if not settings.has_valid_timeout():
68-
self._log.warning(f"Invalid {self._name} script timeout defined: {settings.timeout}. It will not be awaited")
69-
continue
70-
71-
self._log.info(f"Waiting {self._name} script '{cmd}' to finish (pid={p.pid})")
72-
timeout = datetime.datetime.now() + datetime.timedelta(seconds=settings.timeout)
73-
74-
timed_out = True
75-
while timeout > datetime.datetime.now():
76-
if p.returncode is None:
77-
await asyncio.sleep(0.001)
78-
else:
79-
timed_out = False
80-
break
81-
82-
if timed_out:
83-
self._log.warning(f"{self._name.capitalize()} script '{cmd}' timed out (pid={p.pid})")
84-
85-
elif settings.wait_execution:
86-
self._log.info(f"Waiting {self._name} script '{cmd}' to finish (pid={p.pid})")
87-
await p.wait()
40+
self._log.info(f"{'Waiting' if should_wait else 'Starting'} {self._name} script: {cmd}")
41+
try:
42+
pid, _, output = await run_async_process(cmd=cmd, user_id=user_id, custom_env=env,
43+
wait=settings.wait_execution, timeout=settings.timeout,
44+
output=False)
45+
46+
if pid is not None:
47+
pids.add(pid)
48+
if should_wait:
49+
self._log.info(f"{self._name.capitalize()} script finished: {cmd} (pid={pid})")
50+
else:
51+
err_output = f": {output}" if output else ""
52+
self._log.error(f"Unexpected error when running {self._name} script '{cmd}'{err_output}")
53+
except ProcessTimedOutError as e:
54+
self._log.warning(f"{self._name.capitalize()} script '{cmd}' timed out (pid={e.pid})")
8855

8956
return pids
9057

91-
async def run(self, scripts: List[ScriptSettings], user_id: Optional[int], user_env: Optional[Dict[str, str]]) -> Set[int]:
58+
async def run(self, scripts: List[ScriptSettings], user_id: Optional[int],
59+
user_env: Optional[Dict[str, str]]) -> Set[int]:
9260
current_user_id = os.getuid()
9361
root_user = is_root_user(current_user_id)
9462

@@ -97,17 +65,18 @@ async def run(self, scripts: List[ScriptSettings], user_id: Optional[int], user_
9765
for settings in scripts:
9866
if root_user:
9967
if not settings.run_as_root and user_id is not None and not is_root_user(user_id):
100-
pids.update(await self._execute_user_scripts(settings, user_id, user_env))
68+
pids.update(await self._execute_scripts(settings, user_id, user_env))
10169
elif self.root_allowed:
102-
pids.update(await self._execute_scripts(settings, None))
70+
pids.update(await self._execute_scripts(settings))
10371
else:
104-
self._log.warning(f"{self._name.capitalize()} scripts {settings.scripts} are not allowed to run at the root level")
72+
self._log.warning(f"{self._name.capitalize()} scripts {settings.scripts} are not allowed "
73+
f"to run at the root level")
10574
elif settings.run_as_root:
10675
self._log.warning(f"Cannot execute {self._name} scripts {settings.scripts} as root user")
10776
elif user_id is None:
108-
pids.update(await self._execute_scripts(settings, None))
77+
pids.update(await self._execute_scripts(settings))
10978
elif current_user_id == user_id:
110-
pids.update(await self._execute_scripts(settings, user_env))
79+
pids.update(await self._execute_scripts(settings, user_env=user_env))
11180
else:
11281
self._log.warning(f"Cannot execute {self._name} scripts {settings.scripts} as user {user_id}")
11382

guapow/common/system.py

+60-57
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@
22
import os
33
import subprocess
44
import traceback
5+
from datetime import datetime, timedelta
56
from io import StringIO
6-
from multiprocessing import Manager, Process
7-
from multiprocessing.managers import DictProxy
87
from re import Pattern
98
from typing import Optional, Tuple, Dict, List, Set, Callable, TypeVar, Collection, Generator
109

1110
BAD_USER_ENV_VARS = {'LD_PRELOAD'}
1211
T = TypeVar('T')
1312

1413

14+
class ProcessTimedOutError(Exception):
15+
def __init__(self, pid: int):
16+
super(ProcessTimedOutError, self).__init__()
17+
self.pid = pid
18+
19+
1520
def syscall(cmd: str, shell: bool = True, cwd: Optional[str] = None, custom_env: Optional[dict] = None, stdin: bool = False, return_output: bool = True) -> Tuple[int, Optional[str]]:
1621
params = {
1722
'args': cmd.split(' ') if not shell else [cmd],
@@ -276,72 +281,70 @@ async def find_children(ppids: Set[int], ppid_map: Optional[Dict[int, Set[int]]]
276281
return children_list
277282

278283

279-
def run_user_command(cmd: str, user_id: int, wait: bool, timeout: Optional[float] = None,
280-
env: Optional[dict] = None, response: Optional[DictProxy] = None, forbidden_env_vars: Optional[Set[str]] = BAD_USER_ENV_VARS):
281-
args = {"args": cmd, "shell": True, "stdin": subprocess.DEVNULL,
282-
"stdout": subprocess.PIPE if wait else subprocess.DEVNULL,
283-
"stderr": subprocess.STDOUT if wait else subprocess.DEVNULL}
284-
285-
if env:
284+
async def run_async_process(cmd: str, user_id: Optional[int] = None, custom_env: Optional[dict] = None,
285+
forbidden_env_vars: Optional[Set[str]] = BAD_USER_ENV_VARS,
286+
wait: bool = True, timeout: Optional[float] = None, output: bool = True,
287+
exception_output: bool = True) \
288+
-> Tuple[Optional[int], Optional[int], Optional[str]]:
289+
"""
290+
Runs a process using the async API
291+
Args:
292+
cmd:
293+
user_id: if the process should be executed in behalf of a different user
294+
custom_env: custom environment variables available for the process to be executed
295+
forbidden_env_vars: environment variables that should not be passed to the process
296+
wait: if the process should be waited
297+
timeout: in seconds
298+
output: if the process output should be read and returned
299+
exception_output: if the traceback of an unexpected raised exception should be returned as the output
300+
Returns: a tuple containing the process id, exitcode and output as a String
301+
302+
"""
303+
args = {"cmd": cmd, "stdin": subprocess.DEVNULL,
304+
"stdout": subprocess.PIPE if output else subprocess.DEVNULL,
305+
"stderr": subprocess.STDOUT if output else subprocess.DEVNULL}
306+
307+
if user_id is not None:
308+
args["preexec_fn"] = lambda: os.setuid(user_id)
309+
310+
if custom_env:
286311
if forbidden_env_vars:
287-
args['env'] = {k: v for k, v in env.items() if k not in forbidden_env_vars}
312+
args['env'] = {k: v for k, v in custom_env.items() if k not in forbidden_env_vars}
288313
else:
289-
args['env'] = env
314+
args['env'] = custom_env
290315

291316
try:
292-
os.setpriority(os.PRIO_PROCESS, os.getpid(), 0) # always launch a command with nice 0
293-
os.setuid(user_id)
294-
295-
p = subprocess.Popen(**args)
296-
297-
if response is not None:
298-
response['pid'] = p.pid
299-
300-
if timeout is not None and timeout > 0:
301-
p.wait(timeout)
302-
elif wait:
303-
p.wait()
304-
305-
if response is not None:
306-
response['exitcode'] = p.returncode
317+
p = await asyncio.create_subprocess_shell(**args)
318+
except Exception:
319+
return None, 1, (traceback.format_exc().replace('\n', ' ') if exception_output else None)
307320

308-
string = StringIO()
309-
for output in p.stdout:
310-
decoded = output.decode()
311-
string.write(decoded)
312-
313-
string.seek(0)
314-
response['output'] = string.read()
315-
316-
except Exception as e:
317-
if response is not None:
318-
response['exitcode'] = 1
319-
response['output'] = traceback.format_exc()
320-
321-
322-
async def run_async_user_process(cmd: str, user_id: int, user_env: Optional[dict], forbidden_env_vars: Optional[Set[str]] = BAD_USER_ENV_VARS) -> Tuple[int, Optional[str]]:
323-
res = new_user_process_response()
321+
if user_id is not None: # set default niceness in case the process is executed in behalf of another user
322+
try:
323+
os.setpriority(os.PRIO_PROCESS, p.pid, 0) # always launch a command with nice 0
324+
except Exception:
325+
pass # do nothing in case the priority could not be changed
324326

327+
should_wait = wait or (timeout and timeout > 0)
325328
try:
326-
p = Process(target=run_user_command, kwargs={'cmd': cmd, 'user_id': user_id, 'wait': True, 'timeout': None, 'env': user_env,
327-
'response': res, 'forbidden_env_vars': forbidden_env_vars})
328-
p.start()
329+
if should_wait:
330+
if timeout is None or timeout < 0:
331+
return p.pid, await p.wait(), ((await p.stdout.read()).decode() if output else None)
332+
elif timeout and timeout > 0:
333+
timeout_at = datetime.now() + timedelta(seconds=timeout)
329334

330-
while p.is_alive():
331-
await asyncio.sleep(0.001)
335+
while datetime.now() < timeout_at:
336+
if p.returncode is not None:
337+
return p.pid, p.returncode, ((await p.stdout.read()).decode() if output else None)
332338

333-
return res['exitcode'], res['output']
334-
except:
335-
error_msg = traceback.format_exc().replace('\n', ' ')
336-
return 1, error_msg
339+
await asyncio.sleep(0.0005)
337340

341+
raise ProcessTimedOutError(p.pid)
338342

339-
def new_user_process_response() -> DictProxy:
340-
proxy_res = Manager().dict()
341-
proxy_res['exitcode'] = None
342-
proxy_res['output'] = None
343-
proxy_res['pid'] = None
344-
return proxy_res
343+
return p.pid, p.returncode, None
344+
except ProcessTimedOutError:
345+
raise
346+
except Exception:
347+
return p.pid, 1, (traceback.format_exc().replace('\n', ' ') if exception_output else None)
345348

346349

347350
async def map_processes_by_parent() -> Dict[int, Set[Tuple[int, str]]]:

guapow/service/optimizer/post_process/task.py

+12-23
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import asyncio
22
import re
3-
import traceback
43
from abc import ABC, abstractmethod
5-
from multiprocessing import Process
64
from re import Pattern
75
from typing import List, Optional, Dict, Set, Tuple, Type, Awaitable
86

97
from guapow.common import system
108
from guapow.common.scripts import RunScripts
9+
from guapow.common.system import run_async_process
1110
from guapow.common.users import is_root_user
1211
from guapow.service.optimizer.gpu import GPUState, GPUDriver
1312
from guapow.service.optimizer.post_process.context import PostProcessContext
@@ -273,28 +272,21 @@ def get_python_cmd_pattern(self) -> Pattern:
273272
def should_run(self, context: PostProcessContext) -> bool:
274273
return bool(context.stopped_processes and context.user_id is not None)
275274

276-
async def _run_command(self, name: str, cmd: str):
277-
try:
278-
await system.async_syscall(cmd, return_output=False, wait=False)
279-
self._log.info(f"Process '{name}' ({cmd}) relaunched")
280-
except:
281-
stack_log = traceback.format_exc().replace('\n', ' ')
282-
self._log.warning(f"An exception happened when relaunching process '{name}' ({cmd}): {stack_log}")
283-
284-
def _run_user_command(self, name: str, cmd: str, user_id: int, user_env: Optional[Dict[str, str]] = None):
285-
try:
286-
Process(daemon=True, target=system.run_user_command, kwargs={'cmd': cmd, 'user_id': user_id, 'env': user_env, 'wait': False}).start()
287-
self._log.info(f"Process '{name}' ({cmd}) relaunched (user={user_id})")
288-
except:
289-
stack_log = traceback.format_exc().replace('\n', ' ')
290-
self._log.warning(f"An exception happened when relaunching process '{name}' ({cmd}) [user={user_id}]: {stack_log}")
275+
async def _run_command(self, name: str, cmd: str, user_id: Optional[int] = None,
276+
user_env: Optional[Dict[str, str]] = None):
277+
await run_async_process(cmd=cmd, user_id=user_id, custom_env=user_env, wait=False, output=False,
278+
exception_output=False)
279+
280+
user_log = f"(user={user_id})" if user_id is not None else ""
281+
self._log.info(f"Process '{name}' ({cmd}) relaunched {user_log}")
291282

292283
async def run(self, context: PostProcessContext):
293284
self_is_root = is_root_user()
294285
root_request = is_root_user(context.user_id)
295286

296287
if not self_is_root and root_request:
297-
self._log.warning(f"It will not be possible to launch the following root processes: {', '.join((c[0] for c in context.stopped_processes))}")
288+
self._log.warning(f"It will not be possible to launch the following root processes: "
289+
f"{', '.join((c[0] for c in context.stopped_processes))}")
298290
return
299291

300292
running_cmds = await system.find_processes_by_command({p[1] for p in context.stopped_processes})
@@ -309,11 +301,8 @@ async def run(self, context: PostProcessContext):
309301

310302
real_cmd = python_cmd[0] if python_cmd else cmd
311303

312-
if self_is_root:
313-
if root_request:
314-
await self._run_command(name, real_cmd)
315-
else:
316-
self._run_user_command(name, real_cmd, context.user_id, context.user_env)
304+
if self_is_root and not root_request:
305+
await self._run_command(name, real_cmd, context.user_id, context.user_env)
317306
else:
318307
await self._run_command(name, real_cmd)
319308

0 commit comments

Comments
 (0)