Skip to content

gh-132775: Use _PyCode GetScriptXIData() #134441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Include/internal/pycore_pyerrors.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ extern void _PyErr_Fetch(
PyObject **value,
PyObject **traceback);

extern PyObject* _PyErr_GetRaisedException(PyThreadState *tstate);
PyAPI_FUNC(PyObject*) _PyErr_GetRaisedException(PyThreadState *tstate);

PyAPI_FUNC(int) _PyErr_ExceptionMatches(
PyThreadState *tstate,
PyObject *exc);

extern void _PyErr_SetRaisedException(PyThreadState *tstate, PyObject *exc);
PyAPI_FUNC(void) _PyErr_SetRaisedException(PyThreadState *tstate, PyObject *exc);

extern void _PyErr_Restore(
PyThreadState *tstate,
Expand Down
55 changes: 13 additions & 42 deletions Lib/concurrent/futures/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ def __str__(self):
""".strip())


UNBOUND = 2 # error; this should not happen.


class WorkerContext(_thread.WorkerContext):

@classmethod
Expand All @@ -47,23 +44,13 @@ def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
if args or kwargs:
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
data = textwrap.dedent(fn)
kind = 'script'
# Make sure the script compiles.
# Ideally we wouldn't throw away the resulting code
# object. However, there isn't much to be done until
# code objects are shareable and/or we do a better job
# of supporting code objects in _interpreters.exec().
compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
data = pickle.dumps((fn, args, kwargs))
kind = 'function'
return (data, kind)
task = (fn, args, kwargs)
data = pickle.dumps(task)
return data

if initializer is not None:
try:
Expand All @@ -86,24 +73,20 @@ def _capture_exc(cls, resultsid):
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
err = pickle.dumps(exc)
_interpqueues.put(resultsid, (None, err), 1, UNBOUND)
_interpqueues.put(resultsid, (None, exc))
raise # re-raise

@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None), 0, UNBOUND)
_interpqueues.put(resultsid, (None, None))

@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
try:
_interpqueues.put(resultsid, (res, None), 0, UNBOUND)
except _interpreters.NotShareableError:
res = pickle.dumps(res)
_interpqueues.put(resultsid, (res, None), 1, UNBOUND)
with cls._capture_exc(resultsid):
_interpqueues.put(resultsid, (res, None))

@classmethod
def _call_pickled(cls, pickled, resultsid):
Expand Down Expand Up @@ -134,8 +117,7 @@ def initialize(self):
_interpreters.incref(self.interpid)

maxsize = 0
fmt = 0
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
self.resultsid = _interpqueues.create(maxsize)

self._exec(f'from {__name__} import WorkerContext')

Expand Down Expand Up @@ -166,17 +148,8 @@ def finalize(self):
pass

def run(self, task):
data, kind = task
if kind == 'script':
raise NotImplementedError('script kind disabled')
script = f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data, ' ')}
WorkerContext._send_script_result({self.resultsid})"""
elif kind == 'function':
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
else:
raise NotImplementedError(kind)
data = task
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'

try:
self._exec(script)
Expand All @@ -199,15 +172,13 @@ def run(self, task):
continue
else:
break
(res, excdata), pickled, unboundop = obj
(res, exc), unboundop = obj
assert unboundop is None, unboundop
if excdata is not None:
if exc is not None:
assert res is None, res
assert pickled
assert exc_wrapper is not None
exc = pickle.loads(excdata)
raise exc from exc_wrapper
return pickle.loads(res) if pickled else res
return res


class BrokenInterpreterPool(_thread.BrokenThreadPool):
Expand Down
85 changes: 55 additions & 30 deletions Lib/test/support/interpreters/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,23 @@ def create(*, unbounditems=UNBOUND):
"""
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
cid = _channels.create(unboundop)
recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
cid = _channels.create(unboundop, -1)
recv, send = RecvChannel(cid), SendChannel(cid)
send._set_unbound(unboundop, unbounditems)
return recv, send


def list_all():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
for cid, unbound in _channels.list_all()]
channels = []
for cid, unboundop, _ in _channels.list_all():
chan = _, send = RecvChannel(cid), SendChannel(cid)
if not hasattr(send, '_unboundop'):
send._set_unbound(unboundop)
else:
assert send._unbound[0] == unboundop
channels.append(chan)
return channels


class _ChannelEnd:
Expand Down Expand Up @@ -175,78 +183,95 @@ class SendChannel(_ChannelEnd):

_end = 'send'

def __new__(cls, cid, *, _unbound=None):
if _unbound is None:
try:
op = _channels.get_channel_defaults(cid)
_unbound = (op,)
except ChannelNotFoundError:
_unbound = _serialize_unbound(UNBOUND)
self = super().__new__(cls, cid)
self._unbound = _unbound
return self
# def __new__(cls, cid, *, _unbound=None):
# if _unbound is None:
# try:
# op = _channels.get_channel_defaults(cid)
# _unbound = (op,)
# except ChannelNotFoundError:
# _unbound = _serialize_unbound(UNBOUND)
# self = super().__new__(cls, cid)
# self._unbound = _unbound
# return self

def _set_unbound(self, op, items=None):
assert not hasattr(self, '_unbound')
if items is None:
items = _resolve_unbound(op)
unbound = (op, items)
self._unbound = unbound
return unbound

@property
def unbounditems(self):
try:
_, items = self._unbound
except AttributeError:
op, _ = _channels.get_queue_defaults(self._id)
_, items = self._set_unbound(op)
return items

@property
def is_closed(self):
info = self._info
return info.closed or info.closing

def send(self, obj, timeout=None, *,
unbound=None,
unbounditems=None,
):
"""Send the object (i.e. its data) to the channel's receiving end.

This blocks until the object is received.
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
_channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)

def send_nowait(self, obj, *,
unbound=None,
unbounditems=None,
):
"""Send the object to the channel's receiving end.

If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj, unboundop, blocking=False)

def send_buffer(self, obj, timeout=None, *,
unbound=None,
unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.

This blocks until the object is received.
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
_channels.send_buffer(self._id, obj, unboundop,
timeout=timeout, blocking=True)

def send_buffer_nowait(self, obj, *,
unbound=None,
unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.

If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
if unbound is None:
unboundop, = self._unbound
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbound)
unboundop, = _serialize_unbound(unbounditems)
return _channels.send_buffer(self._id, obj, unboundop, blocking=False)

def close(self):
Expand Down
Loading
Loading