Skip to content

gh-128002: use per threads tasks linked list in asyncio #128869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ struct _is {
PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS];
_PyIndexPool tlbc_indices;
#endif

// Per-interpreter list of tasks, any lingering tasks from thread
// states gets added here and removed from the corresponding
// thread state's list.
struct llist_node asyncio_tasks_head;
// Per-interpreter state for the obmalloc allocator. For the main
// interpreter and for all interpreters that don't have their
// own obmalloc state, this points to the static structure in
Expand Down
2 changes: 1 addition & 1 deletion Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ typedef enum _PyLockFlags {

// Lock a mutex with an optional timeout and additional options. See
// _PyLockFlags for details.
extern PyLockStatus
extern PyAPI_FUNC(PyLockStatus)
_PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags);

// Lock a mutex with additional options. See _PyLockFlags for details.
Expand Down
4 changes: 2 additions & 2 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime);
// Perform a stop-the-world pause for threads in the specified interpreter.
//
// NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
extern void _PyEval_StopTheWorld(PyInterpreterState *interp);
extern void _PyEval_StartTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp);


static inline void
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_tstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ typedef struct _PyThreadStateImpl {

PyObject *asyncio_running_loop; // Strong reference

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it used in `asyncio.all_tasks`.
*/
struct llist_node asyncio_tasks_head;
struct _qsbr_thread_state *qsbr; // only used by free-threaded build
struct llist_node mem_free_queue; // delayed free queue


#ifdef Py_GIL_DISABLED
struct _gc_thread_state gc;
struct _mimalloc_thread_state mimalloc;
Expand Down
39 changes: 39 additions & 0 deletions Lib/test/test_asyncio/test_free_threading.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import threading
import unittest
import weakref
from threading import Thread
from unittest import TestCase

from test import support
from test.support import threading_helper

threading_helper.requires_working_threading(module=True)
Expand Down Expand Up @@ -58,6 +61,42 @@ def runner():
with threading_helper.start_threads(threads):
pass

def test_all_tasks_different_thread(self) -> None:
task = None
loop = asyncio.EventLoop()
started = threading.Event()
stop = threading.Event()
async def func():
nonlocal task
task = asyncio.current_task()
started.set()
stop.wait()
loop.call_soon_threadsafe(loop.stop)

loop.create_task(func())
thread = Thread(target=loop.run_forever)
with threading_helper.start_threads([thread]):
started.wait()
self.assertSetEqual(asyncio.all_tasks(loop), {task})
stop.set()
loop.close()

def test_task_different_thread_finalized(self) -> None:
task = None
async def func():
nonlocal task
task = asyncio.current_task()

thread = Thread(target=lambda: asyncio.run(func()))
thread.start()
thread.join()
wr = weakref.ref(task)
del thread
del task
# task finalization in different thread shouldn't crash
support.gc_collect()
self.assertIsNone(wr())

def test_run_coroutine_threadsafe(self) -> None:
results = []

Expand Down
153 changes: 100 additions & 53 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ typedef struct TaskObj {
PyObject *task_name;
PyObject *task_context;
struct llist_node task_node;
#ifdef Py_GIL_DISABLED
// thread id of the thread where this task was created
uintptr_t task_tid;
#endif
} TaskObj;

typedef struct {
Expand All @@ -76,19 +80,8 @@ typedef struct {
#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)

#ifdef Py_GIL_DISABLED
# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
#else
# define ASYNCIO_STATE_LOCK(state) ((void)state)
# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
#endif

/* State of the _asyncio module */
typedef struct {
#ifdef Py_GIL_DISABLED
PyMutex mutex;
#endif
PyTypeObject *FutureIterType;
PyTypeObject *TaskStepMethWrapper_Type;
PyTypeObject *FutureType;
Expand Down Expand Up @@ -135,11 +128,6 @@ typedef struct {
/* Counter for autogenerated Task names */
uint64_t task_name_counter;

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it. Third party tasks implementations which don't inherit from
`asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
*/
struct llist_node asyncio_tasks_head;
} asyncio_state;

static inline asyncio_state *
Expand Down Expand Up @@ -1997,16 +1985,15 @@ static PyMethodDef TaskWakeupDef = {
static void
register_task(asyncio_state *state, TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next != NULL) {
// already registered
assert(task->task_node.prev != NULL);
goto exit;
return;
}
llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
struct llist_node *head = &tstate->asyncio_tasks_head;
llist_insert_tail(head, &task->task_node);
}

static int
Expand All @@ -2015,19 +2002,38 @@ register_eager_task(asyncio_state *state, PyObject *task)
return PySet_Add(state->eager_tasks, task);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
static inline void
unregister_task_safe(TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next == NULL) {
// not registered
assert(task->task_node.prev == NULL);
goto exit;
return;
}
llist_remove(&task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
{
assert(Task_Check(state, task));
#ifdef Py_GIL_DISABLED
// check if we are in the same thread
// if so, we can avoid locking
if (task->task_tid == _Py_ThreadId()) {
unregister_task_safe(task);
}
else {
// we are in a different thread
// stop the world then check and remove the task
PyThreadState *tstate = _PyThreadState_GET();
_PyEval_StopTheWorld(tstate->interp);
unregister_task_safe(task);
_PyEval_StartTheWorld(tstate->interp);
}
#else
unregister_task_safe(task);
#endif
}

static int
Expand Down Expand Up @@ -2182,6 +2188,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
}

Py_CLEAR(self->task_fut_waiter);
#ifdef Py_GIL_DISABLED
self->task_tid = _Py_ThreadId();
#endif
self->task_must_cancel = 0;
self->task_log_destroy_pending = 1;
self->task_num_cancels_requested = 0;
Expand Down Expand Up @@ -3706,6 +3715,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
static inline int
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
{
assert(PySet_CheckExact(tasks));
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
if (done == NULL) {
return -1;
Expand All @@ -3728,6 +3738,43 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
return 0;
}

static inline int
add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
{
#ifdef Py_GIL_DISABLED
assert(interp->stoptheworld.world_stopped);
#endif
// Start traversing from interpreter's linked list
struct llist_node *head = &interp->asyncio_tasks_head;
_PyThreadStateImpl *thead = (_PyThreadStateImpl *)interp->threads.head;

struct llist_node *node;
traverse:
llist_for_each_safe(node, head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
// do not call any escaping calls here while holding the runtime lock.
return -1;
}
}
}
// traverse the linked lists of thread states
if (thead != NULL) {
head = &thead->asyncio_tasks_head;
thead = (_PyThreadStateImpl *)thead->base.next;
goto traverse;
}
return 0;
}

/*********************** Module **************************/

/*[clinic input]
Expand Down Expand Up @@ -3766,30 +3813,31 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
Py_DECREF(loop);
return NULL;
}
int err = 0;
ASYNCIO_STATE_LOCK(state);
struct llist_node *node;

llist_for_each_safe(node, &state->asyncio_tasks_head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
Py_DECREF(tasks);
Py_DECREF(loop);
err = 1;
break;
}
}
}
ASYNCIO_STATE_UNLOCK(state);
if (err) {
PyInterpreterState *interp = PyInterpreterState_Get();
// Stop the world and traverse the per-thread linked list
// of asyncio tasks of all threads and the interpreter's
// linked list and them to tasks list.
// The interpreter linked list is used for any lingering tasks
// whose thread state has been deallocated but the task is
// still alive. This can happen if task is referenced by a
// different thread, in which case the task is moved to the
// interpreter's linked list from the thread's linked list
// before deallocation.
// Stop the world pause is required so that no thread
// modifies it's linked list while being iterated here
// concurrently.
// This design allows for lock free register/unregister of tasks
// of loops running concurrently in different threads (general case).
_PyEval_StopTheWorld(interp);
HEAD_LOCK(interp->runtime);
int ret = add_tasks_interp(interp, (PyListObject *)tasks);
HEAD_UNLOCK(interp->runtime);
_PyEval_StartTheWorld(interp);
if (ret < 0) {
// call any escaping calls after releasing the runtime lock
// and starting the world to avoid any deadlocks.
Py_DECREF(tasks);
Py_DECREF(loop);
return NULL;
}
PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
Expand Down Expand Up @@ -4015,7 +4063,6 @@ module_exec(PyObject *mod)
{
asyncio_state *state = get_asyncio_state(mod);

llist_init(&state->asyncio_tasks_head);

#define CREATE_TYPE(m, tp, spec, base) \
do { \
Expand Down
11 changes: 10 additions & 1 deletion Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ init_interpreter(PyInterpreterState *interp,
_Py_brc_init_state(interp);
#endif
llist_init(&interp->mem_free_queue.head);
llist_init(&interp->asyncio_tasks_head);
for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
interp->monitors.tools[i] = 0;
}
Expand Down Expand Up @@ -1519,7 +1520,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
tstate->delete_later = NULL;

llist_init(&_tstate->mem_free_queue);

llist_init(&_tstate->asyncio_tasks_head);
if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
// Start in the suspended state if there is an ongoing stop-the-world.
tstate->state = _Py_THREAD_SUSPENDED;
Expand Down Expand Up @@ -1698,6 +1699,14 @@ PyThreadState_Clear(PyThreadState *tstate)

Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);


_PyEval_StopTheWorld(tstate->interp);
// merge any lingering tasks from thread state to interpreter's
// tasks list
llist_concat(&tstate->interp->asyncio_tasks_head,
&((_PyThreadStateImpl *)tstate)->asyncio_tasks_head);
_PyEval_StartTheWorld(tstate->interp);

Py_CLEAR(tstate->dict);
Py_CLEAR(tstate->async_exc);

Expand Down
Loading