Skip to content

Commit 1b70da5

Browse files
kumaraditya303ambv
authored andcommitted
pythongh-128002: use per threads tasks linked list in asyncio (python#128869)
Co-authored-by: Łukasz Langa <[email protected]>
1 parent 8b80c41 commit 1b70da5

File tree

7 files changed

+156
-58
lines changed

7 files changed

+156
-58
lines changed

Include/internal/pycore_interp.h

+7
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,13 @@ struct _is {
227227
PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS];
228228
_PyIndexPool tlbc_indices;
229229
#endif
230+
// Per-interpreter list of tasks, any lingering tasks from thread
231+
// states gets added here and removed from the corresponding
232+
// thread state's list.
233+
struct llist_node asyncio_tasks_head;
234+
// `asyncio_tasks_lock` is used when tasks are moved
235+
// from thread's list to interpreter's list.
236+
PyMutex asyncio_tasks_lock;
230237

231238
// Per-interpreter state for the obmalloc allocator. For the main
232239
// interpreter and for all interpreters that don't have their

Include/internal/pycore_lock.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ typedef enum _PyLockFlags {
5252

5353
// Lock a mutex with an optional timeout and additional options. See
5454
// _PyLockFlags for details.
55-
extern PyLockStatus
55+
extern PyAPI_FUNC(PyLockStatus)
5656
_PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags);
5757

5858
// Lock a mutex with additional options. See _PyLockFlags for details.

Include/internal/pycore_pystate.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime);
182182
// Perform a stop-the-world pause for threads in the specified interpreter.
183183
//
184184
// NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
185-
extern void _PyEval_StopTheWorld(PyInterpreterState *interp);
186-
extern void _PyEval_StartTheWorld(PyInterpreterState *interp);
185+
extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp);
186+
extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp);
187187

188188

189189
static inline void

Include/internal/pycore_tstate.h

+5
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@ typedef struct _PyThreadStateImpl {
2424
PyObject *asyncio_running_loop; // Strong reference
2525
PyObject *asyncio_running_task; // Strong reference
2626

27+
/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
28+
or subclasses of it used in `asyncio.all_tasks`.
29+
*/
30+
struct llist_node asyncio_tasks_head;
2731
struct _qsbr_thread_state *qsbr; // only used by free-threaded build
2832
struct llist_node mem_free_queue; // delayed free queue
2933

34+
3035
#ifdef Py_GIL_DISABLED
3136
struct _gc_thread_state gc;
3237
struct _mimalloc_thread_state mimalloc;

Lib/test/test_asyncio/test_free_threading.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import unittest
44
from threading import Thread
55
from unittest import TestCase
6-
6+
import weakref
7+
from test import support
78
from test.support import threading_helper
89

910
threading_helper.requires_working_threading(module=True)
@@ -95,6 +96,22 @@ def check():
9596
done.set()
9697
runner.join()
9798

99+
def test_task_different_thread_finalized(self) -> None:
100+
task = None
101+
async def func():
102+
nonlocal task
103+
task = asyncio.current_task()
104+
105+
thread = Thread(target=lambda: asyncio.run(func()))
106+
thread.start()
107+
thread.join()
108+
wr = weakref.ref(task)
109+
del thread
110+
del task
111+
# task finalization in different thread shouldn't crash
112+
support.gc_collect()
113+
self.assertIsNone(wr())
114+
98115
def test_run_coroutine_threadsafe(self) -> None:
99116
results = []
100117

Modules/_asynciomodule.c

+112-53
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ typedef struct TaskObj {
6767
PyObject *task_name;
6868
PyObject *task_context;
6969
struct llist_node task_node;
70+
#ifdef Py_GIL_DISABLED
71+
// thread id of the thread where this task was created
72+
uintptr_t task_tid;
73+
#endif
7074
} TaskObj;
7175

7276
typedef struct {
@@ -94,14 +98,6 @@ typedef struct {
9498
|| PyObject_TypeCheck(obj, state->FutureType) \
9599
|| PyObject_TypeCheck(obj, state->TaskType))
96100

97-
#ifdef Py_GIL_DISABLED
98-
# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
99-
# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
100-
#else
101-
# define ASYNCIO_STATE_LOCK(state) ((void)state)
102-
# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
103-
#endif
104-
105101
typedef struct _Py_AsyncioModuleDebugOffsets {
106102
struct _asyncio_task_object {
107103
uint64_t size;
@@ -135,9 +131,6 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug)
135131

136132
/* State of the _asyncio module */
137133
typedef struct {
138-
#ifdef Py_GIL_DISABLED
139-
PyMutex mutex;
140-
#endif
141134
PyTypeObject *FutureIterType;
142135
PyTypeObject *TaskStepMethWrapper_Type;
143136
PyTypeObject *FutureType;
@@ -184,11 +177,6 @@ typedef struct {
184177
/* Counter for autogenerated Task names */
185178
uint64_t task_name_counter;
186179

187-
/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
188-
or subclasses of it. Third party tasks implementations which don't inherit from
189-
`asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
190-
*/
191-
struct llist_node asyncio_tasks_head;
192180
} asyncio_state;
193181

194182
static inline asyncio_state *
@@ -2179,16 +2167,15 @@ static PyMethodDef TaskWakeupDef = {
21792167
static void
21802168
register_task(asyncio_state *state, TaskObj *task)
21812169
{
2182-
ASYNCIO_STATE_LOCK(state);
21832170
assert(Task_Check(state, task));
21842171
if (task->task_node.next != NULL) {
21852172
// already registered
21862173
assert(task->task_node.prev != NULL);
2187-
goto exit;
2174+
return;
21882175
}
2189-
llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
2190-
exit:
2191-
ASYNCIO_STATE_UNLOCK(state);
2176+
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
2177+
struct llist_node *head = &tstate->asyncio_tasks_head;
2178+
llist_insert_tail(head, &task->task_node);
21922179
}
21932180

21942181
static int
@@ -2197,19 +2184,38 @@ register_eager_task(asyncio_state *state, PyObject *task)
21972184
return PySet_Add(state->eager_tasks, task);
21982185
}
21992186

2200-
static void
2201-
unregister_task(asyncio_state *state, TaskObj *task)
2187+
static inline void
2188+
unregister_task_safe(TaskObj *task)
22022189
{
2203-
ASYNCIO_STATE_LOCK(state);
2204-
assert(Task_Check(state, task));
22052190
if (task->task_node.next == NULL) {
22062191
// not registered
22072192
assert(task->task_node.prev == NULL);
2208-
goto exit;
2193+
return;
22092194
}
22102195
llist_remove(&task->task_node);
2211-
exit:
2212-
ASYNCIO_STATE_UNLOCK(state);
2196+
}
2197+
2198+
static void
2199+
unregister_task(asyncio_state *state, TaskObj *task)
2200+
{
2201+
assert(Task_Check(state, task));
2202+
#ifdef Py_GIL_DISABLED
2203+
// check if we are in the same thread
2204+
// if so, we can avoid locking
2205+
if (task->task_tid == _Py_ThreadId()) {
2206+
unregister_task_safe(task);
2207+
}
2208+
else {
2209+
// we are in a different thread
2210+
// stop the world then check and remove the task
2211+
PyThreadState *tstate = _PyThreadState_GET();
2212+
_PyEval_StopTheWorld(tstate->interp);
2213+
unregister_task_safe(task);
2214+
_PyEval_StartTheWorld(tstate->interp);
2215+
}
2216+
#else
2217+
unregister_task_safe(task);
2218+
#endif
22132219
}
22142220

22152221
static int
@@ -2423,6 +2429,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
24232429
}
24242430

24252431
Py_CLEAR(self->task_fut_waiter);
2432+
#ifdef Py_GIL_DISABLED
2433+
self->task_tid = _Py_ThreadId();
2434+
#endif
24262435
self->task_must_cancel = 0;
24272436
self->task_log_destroy_pending = 1;
24282437
self->task_num_cancels_requested = 0;
@@ -3981,6 +3990,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
39813990
static inline int
39823991
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
39833992
{
3993+
assert(PySet_CheckExact(tasks));
39843994
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
39853995
if (done == NULL) {
39863996
return -1;
@@ -4003,6 +4013,57 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
40034013
return 0;
40044014
}
40054015

4016+
static inline int
4017+
add_tasks_llist(struct llist_node *head, PyListObject *tasks)
4018+
{
4019+
struct llist_node *node;
4020+
llist_for_each_safe(node, head) {
4021+
TaskObj *task = llist_data(node, TaskObj, task_node);
4022+
// The linked list holds borrowed references to task
4023+
// as such it is possible that the task is concurrently
4024+
// deallocated while added to this list.
4025+
// To protect against concurrent deallocations,
4026+
// we first try to incref the task which would fail
4027+
// if it is concurrently getting deallocated in another thread,
4028+
// otherwise it gets added to the list.
4029+
if (_Py_TryIncref((PyObject *)task)) {
4030+
if (_PyList_AppendTakeRef(tasks, (PyObject *)task) < 0) {
4031+
// do not call any escaping calls here while the world is stopped.
4032+
return -1;
4033+
}
4034+
}
4035+
}
4036+
return 0;
4037+
}
4038+
4039+
static inline int
4040+
add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
4041+
{
4042+
#ifdef Py_GIL_DISABLED
4043+
assert(interp->stoptheworld.world_stopped);
4044+
#endif
4045+
// Start traversing from interpreter's linked list
4046+
struct llist_node *head = &interp->asyncio_tasks_head;
4047+
4048+
if (add_tasks_llist(head, tasks) < 0) {
4049+
return -1;
4050+
}
4051+
4052+
int ret = 0;
4053+
// traverse the task lists of thread states
4054+
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
4055+
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)p;
4056+
head = &ts->asyncio_tasks_head;
4057+
if (add_tasks_llist(head, tasks) < 0) {
4058+
ret = -1;
4059+
goto exit;
4060+
}
4061+
}
4062+
exit:
4063+
_Py_FOR_EACH_TSTATE_END(interp);
4064+
return ret;
4065+
}
4066+
40064067
/*********************** Module **************************/
40074068

40084069
/*[clinic input]
@@ -4041,30 +4102,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
40414102
Py_DECREF(loop);
40424103
return NULL;
40434104
}
4044-
int err = 0;
4045-
ASYNCIO_STATE_LOCK(state);
4046-
struct llist_node *node;
4047-
4048-
llist_for_each_safe(node, &state->asyncio_tasks_head) {
4049-
TaskObj *task = llist_data(node, TaskObj, task_node);
4050-
// The linked list holds borrowed references to task
4051-
// as such it is possible that the task is concurrently
4052-
// deallocated while added to this list.
4053-
// To protect against concurrent deallocations,
4054-
// we first try to incref the task which would fail
4055-
// if it is concurrently getting deallocated in another thread,
4056-
// otherwise it gets added to the list.
4057-
if (_Py_TryIncref((PyObject *)task)) {
4058-
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
4059-
Py_DECREF(tasks);
4060-
Py_DECREF(loop);
4061-
err = 1;
4062-
break;
4063-
}
4064-
}
4065-
}
4066-
ASYNCIO_STATE_UNLOCK(state);
4067-
if (err) {
4105+
PyInterpreterState *interp = PyInterpreterState_Get();
4106+
// Stop the world and traverse the per-thread linked list
4107+
// of asyncio tasks for every thread, as well as the
4108+
// interpreter's linked list, and add them to `tasks`.
4109+
// The interpreter linked list is used for any lingering tasks
4110+
// whose thread state has been deallocated while the task was
4111+
// still alive. This can happen if a task is referenced by
4112+
// a different thread, in which case the task is moved to
4113+
// the interpreter's linked list from the thread's linked
4114+
// list before deallocation. See PyThreadState_Clear.
4115+
//
4116+
// The stop-the-world pause is required so that no thread
4117+
// modifies its linked list while being iterated here
4118+
// in parallel. This design allows for lock-free
4119+
// register_task/unregister_task for loops running in parallel
4120+
// in different threads (the general case).
4121+
_PyEval_StopTheWorld(interp);
4122+
int ret = add_tasks_interp(interp, (PyListObject *)tasks);
4123+
_PyEval_StartTheWorld(interp);
4124+
if (ret < 0) {
4125+
// call any escaping calls after starting the world to avoid any deadlocks.
4126+
Py_DECREF(tasks);
4127+
Py_DECREF(loop);
40684128
return NULL;
40694129
}
40704130
PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
@@ -4348,7 +4408,6 @@ module_exec(PyObject *mod)
43484408
{
43494409
asyncio_state *state = get_asyncio_state(mod);
43504410

4351-
llist_init(&state->asyncio_tasks_head);
43524411

43534412
#define CREATE_TYPE(m, tp, spec, base) \
43544413
do { \

Python/pystate.c

+11-1
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,8 @@ init_interpreter(PyInterpreterState *interp,
643643
_Py_brc_init_state(interp);
644644
#endif
645645
llist_init(&interp->mem_free_queue.head);
646+
llist_init(&interp->asyncio_tasks_head);
647+
interp->asyncio_tasks_lock = (PyMutex){0};
646648
for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
647649
interp->monitors.tools[i] = 0;
648650
}
@@ -1512,7 +1514,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
15121514
tstate->delete_later = NULL;
15131515

15141516
llist_init(&_tstate->mem_free_queue);
1515-
1517+
llist_init(&_tstate->asyncio_tasks_head);
15161518
if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
15171519
// Start in the suspended state if there is an ongoing stop-the-world.
15181520
tstate->state = _Py_THREAD_SUSPENDED;
@@ -1692,6 +1694,14 @@ PyThreadState_Clear(PyThreadState *tstate)
16921694
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);
16931695
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task);
16941696

1697+
1698+
PyMutex_Lock(&tstate->interp->asyncio_tasks_lock);
1699+
// merge any lingering tasks from thread state to interpreter's
1700+
// tasks list
1701+
llist_concat(&tstate->interp->asyncio_tasks_head,
1702+
&((_PyThreadStateImpl *)tstate)->asyncio_tasks_head);
1703+
PyMutex_Unlock(&tstate->interp->asyncio_tasks_lock);
1704+
16951705
Py_CLEAR(tstate->dict);
16961706
Py_CLEAR(tstate->async_exc);
16971707

0 commit comments

Comments
 (0)