From 37e9a7d377f5683cc3c01b19c07429d909c8483a Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 28 Jan 2025 11:37:00 +0000 Subject: [PATCH] fix thread safety --- Modules/_asynciomodule.c | 171 +++++++++++++++++------------- Modules/clinic/_asynciomodule.c.h | 52 ++++----- 2 files changed, 124 insertions(+), 99 deletions(-) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index d5d49658555f1a..ccaddc7d3f4516 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -549,30 +549,28 @@ future_init(FutureObj *fut, PyObject *loop) } static int -future_awaited_by_add(asyncio_state *state, PyObject *fut, PyObject *thing) +future_awaited_by_add(asyncio_state *state, FutureObj *fut, PyObject *thing) { - if (!TaskOrFuture_Check(state, fut) || !TaskOrFuture_Check(state, thing)) { - // We only want to support native asyncio Futures. - // For further insight see the comment in the Python - // implementation of "future_add_to_awaited_by()". - return 0; - } - - FutureObj *_fut = (FutureObj *)fut; + _Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED(fut); + // We only want to support native asyncio Futures. + // For further insight see the comment in the Python + // implementation of "future_add_to_awaited_by()". + assert(TaskOrFuture_Check(state, fut)); + assert(TaskOrFuture_Check(state, thing)); /* Most futures/task are only awaited by one entity, so we want to avoid always creating a set for `fut_awaited_by`. */ - if (_fut->fut_awaited_by == NULL) { - assert(!_fut->fut_awaited_by_is_set); + if (fut->fut_awaited_by == NULL) { + assert(!fut->fut_awaited_by_is_set); Py_INCREF(thing); - _fut->fut_awaited_by = thing; + fut->fut_awaited_by = thing; return 0; } - if (_fut->fut_awaited_by_is_set) { - assert(PySet_CheckExact(_fut->fut_awaited_by)); - return PySet_Add(_fut->fut_awaited_by, thing); + if (fut->fut_awaited_by_is_set) { + assert(PySet_CheckExact(fut->fut_awaited_by)); + return PySet_Add(fut->fut_awaited_by, thing); } PyObject *set = PySet_New(NULL); @@ -583,40 +581,38 @@ future_awaited_by_add(asyncio_state *state, PyObject *fut, PyObject *thing) Py_DECREF(set); return -1; } - if (PySet_Add(set, _fut->fut_awaited_by)) { + if (PySet_Add(set, fut->fut_awaited_by)) { Py_DECREF(set); return -1; } - Py_SETREF(_fut->fut_awaited_by, set); - _fut->fut_awaited_by_is_set = 1; + Py_SETREF(fut->fut_awaited_by, set); + fut->fut_awaited_by_is_set = 1; return 0; } static int -future_awaited_by_discard(asyncio_state *state, PyObject *fut, PyObject *thing) +future_awaited_by_discard(asyncio_state *state, FutureObj *fut, PyObject *thing) { - if (!TaskOrFuture_Check(state, fut) || !TaskOrFuture_Check(state, thing)) { - // We only want to support native asyncio Futures. - // For further insight see the comment in the Python - // implementation of "future_add_to_awaited_by()". - return 0; - } - - FutureObj *_fut = (FutureObj *)fut; + _Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED(fut); + // We only want to support native asyncio Futures. + // For further insight see the comment in the Python + // implementation of "future_add_to_awaited_by()". + assert(TaskOrFuture_Check(state, fut)); + assert(TaskOrFuture_Check(state, thing)); /* Following the semantics of 'set.discard()' here in not raising an error if `thing` isn't in the `awaited_by` "set". */ - if (_fut->fut_awaited_by == NULL) { + if (fut->fut_awaited_by == NULL) { return 0; } - if (_fut->fut_awaited_by == thing) { - Py_CLEAR(_fut->fut_awaited_by); + if (fut->fut_awaited_by == thing) { + Py_CLEAR(fut->fut_awaited_by); return 0; } - if (_fut->fut_awaited_by_is_set) { - assert(PySet_CheckExact(_fut->fut_awaited_by)); - int err = PySet_Discard(_fut->fut_awaited_by, thing); + if (fut->fut_awaited_by_is_set) { + assert(PySet_CheckExact(fut->fut_awaited_by)); + int err = PySet_Discard(fut->fut_awaited_by, thing); if (err < 0) { return -1; } else { @@ -626,36 +622,6 @@ future_awaited_by_discard(asyncio_state *state, PyObject *fut, PyObject *thing) return 0; } -/*[clinic input] -@critical_section -@getter -_asyncio.Future._asyncio_awaited_by -[clinic start generated code]*/ - -static PyObject * -_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self) -/*[clinic end generated code: output=932af76d385d2e2a input=64c1783df2d44d2b]*/ -{ - /* Implementation of a Python getter. */ - if (self->fut_awaited_by == NULL) { - Py_RETURN_NONE; - } - if (self->fut_awaited_by_is_set) { - /* Already a set, just wrap it into a frozen set and return. */ - assert(PySet_CheckExact(self->fut_awaited_by)); - return PyFrozenSet_New(self->fut_awaited_by); - } - - PyObject *set = PyFrozenSet_New(NULL); - if (set == NULL) { - return NULL; - } - if (PySet_Add(set, self->fut_awaited_by)) { - Py_DECREF(set); - return NULL; - } - return set; -} static PyObject * future_set_result(asyncio_state *state, FutureObj *fut, PyObject *res) @@ -1362,6 +1328,38 @@ _asyncio_Future_get_loop_impl(FutureObj *self, PyTypeObject *cls) return Py_NewRef(self->fut_loop); } +/*[clinic input] +@critical_section +@getter +_asyncio.Future._asyncio_awaited_by +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self) +/*[clinic end generated code: output=932af76d385d2e2a input=64c1783df2d44d2b]*/ +{ + /* Implementation of a Python getter. */ + if (self->fut_awaited_by == NULL) { + Py_RETURN_NONE; + } + if (self->fut_awaited_by_is_set) { + /* Already a set, just wrap it into a frozen set and return. */ + assert(PySet_CheckExact(self->fut_awaited_by)); + return PyFrozenSet_New(self->fut_awaited_by); + } + + PyObject *set = PyFrozenSet_New(NULL); + if (set == NULL) { + return NULL; + } + if (PySet_Add(set, self->fut_awaited_by)) { + Py_DECREF(set); + return NULL; + } + return set; +} + + /*[clinic input] @critical_section @getter @@ -3296,8 +3294,11 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu if (!fut->fut_blocking) { goto yield_insteadof_yf; } - - if (future_awaited_by_add(state, result, (PyObject *)task)) { + int res; + Py_BEGIN_CRITICAL_SECTION(result); + res = future_awaited_by_add(state, (FutureObj *)result, (PyObject *)task); + Py_END_CRITICAL_SECTION(); + if (res) { goto fail; } @@ -3390,8 +3391,14 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu goto yield_insteadof_yf; } - if (future_awaited_by_add(state, result, (PyObject *)task)) { - goto fail; + if (TaskOrFuture_Check(state, result)) { + int res; + Py_BEGIN_CRITICAL_SECTION(result); + res = future_awaited_by_add(state, (FutureObj *)result, (PyObject *)task); + Py_END_CRITICAL_SECTION(); + if (res) { + goto fail; + } } /* result._asyncio_future_blocking = False */ @@ -3606,8 +3613,14 @@ task_wakeup_lock_held(TaskObj *task, PyObject *o) asyncio_state *state = get_asyncio_state_by_def((PyObject *)task); - if (future_awaited_by_discard(state, o, (PyObject *)task)) { - return NULL; + if (TaskOrFuture_Check(state, o)) { + int res; + Py_BEGIN_CRITICAL_SECTION(o); + res = future_awaited_by_discard(state, (FutureObj *)o, (PyObject *)task); + Py_END_CRITICAL_SECTION(); + if (res) { + return NULL; + } } if (Future_CheckExact(state, o) || Task_CheckExact(state, o)) { @@ -4110,8 +4123,14 @@ _asyncio_future_add_to_awaited_by_impl(PyObject *module, PyObject *fut, /*[clinic end generated code: output=0ab9a1a63389e4df input=06e6eaac51f532b9]*/ { asyncio_state *state = get_asyncio_state(module); - if (future_awaited_by_add(state, fut, waiter)) { - return NULL; + if (TaskOrFuture_Check(state, fut) && TaskOrFuture_Check(state, waiter)) { + int res; + Py_BEGIN_CRITICAL_SECTION(fut); + res = future_awaited_by_add(state, (FutureObj *)fut, waiter); + Py_END_CRITICAL_SECTION(); + if (res) { + return NULL; + } } Py_RETURN_NONE; } @@ -4131,8 +4150,14 @@ _asyncio_future_discard_from_awaited_by_impl(PyObject *module, PyObject *fut, /*[clinic end generated code: output=a03b0b4323b779de input=3833f7639e88e483]*/ { asyncio_state *state = get_asyncio_state(module); - if (future_awaited_by_discard(state, fut, waiter)) { - return NULL; + if (TaskOrFuture_Check(state, fut) && TaskOrFuture_Check(state, waiter)) { + int res; + Py_BEGIN_CRITICAL_SECTION(fut); + res = future_awaited_by_add(state, (FutureObj *)fut, waiter); + Py_END_CRITICAL_SECTION(); + if (res) { + return NULL; + } } Py_RETURN_NONE; } diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index c6b7e39788be71..d25411ee9958a1 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -9,31 +9,6 @@ preserve #include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION() #include "pycore_modsupport.h" // _PyArg_UnpackKeywords() -#if !defined(_asyncio_Future__asyncio_awaited_by_DOCSTR) -# define _asyncio_Future__asyncio_awaited_by_DOCSTR NULL -#endif -#if defined(_ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF) -# undef _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF -# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, (setter)_asyncio_Future__asyncio_awaited_by_set, _asyncio_Future__asyncio_awaited_by_DOCSTR}, -#else -# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, NULL, _asyncio_Future__asyncio_awaited_by_DOCSTR}, -#endif - -static PyObject * -_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self); - -static PyObject * -_asyncio_Future__asyncio_awaited_by_get(PyObject *self, void *Py_UNUSED(context)) -{ - PyObject *return_value = NULL; - - Py_BEGIN_CRITICAL_SECTION(self); - return_value = _asyncio_Future__asyncio_awaited_by_get_impl((FutureObj *)self); - Py_END_CRITICAL_SECTION(); - - return return_value; -} - PyDoc_STRVAR(_asyncio_Future___init____doc__, "Future(*, loop=None)\n" "--\n" @@ -534,6 +509,31 @@ _asyncio_Future_get_loop(PyObject *self, PyTypeObject *cls, PyObject *const *arg return return_value; } +#if !defined(_asyncio_Future__asyncio_awaited_by_DOCSTR) +# define _asyncio_Future__asyncio_awaited_by_DOCSTR NULL +#endif +#if defined(_ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF) +# undef _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF +# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, (setter)_asyncio_Future__asyncio_awaited_by_set, _asyncio_Future__asyncio_awaited_by_DOCSTR}, +#else +# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, NULL, _asyncio_Future__asyncio_awaited_by_DOCSTR}, +#endif + +static PyObject * +_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self); + +static PyObject * +_asyncio_Future__asyncio_awaited_by_get(PyObject *self, void *Py_UNUSED(context)) +{ + PyObject *return_value = NULL; + + Py_BEGIN_CRITICAL_SECTION(self); + return_value = _asyncio_Future__asyncio_awaited_by_get_impl((FutureObj *)self); + Py_END_CRITICAL_SECTION(); + + return return_value; +} + #if !defined(_asyncio_Future__asyncio_future_blocking_DOCSTR) # define _asyncio_Future__asyncio_future_blocking_DOCSTR NULL #endif @@ -2174,4 +2174,4 @@ _asyncio_future_discard_from_awaited_by(PyObject *module, PyObject *const *args, exit: return return_value; } -/*[clinic end generated code: output=fe4ffe08404ad566 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=f14ff14c29c691ec input=a9049054013a1b77]*/