Skip to content

External Execution Interface #4616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
448184f
Initial External Execution Interface
nibanks Oct 16, 2024
442c928
Make .NET and Kernel mode happy
nibanks Oct 16, 2024
e0f9d13
Check function
nibanks Oct 16, 2024
adb9ad9
Incomplete example
nibanks Oct 16, 2024
882e630
Simplify
nibanks Oct 16, 2024
da9e0b2
Fix .net
nibanks Oct 16, 2024
94efc44
Remove leftovers
nibanks Oct 16, 2024
9695592
fixes
nibanks Oct 16, 2024
4554407
wip
nibanks Oct 18, 2024
40d1e84
wip
nibanks Oct 28, 2024
74835cf
docs
nibanks Oct 31, 2024
f73a471
Merge branch 'main' into external-execution
nibanks Dec 5, 2024
b743020
A bit of documentation
nibanks Dec 6, 2024
dd7abe0
c++ formatting is nicer
nibanks Dec 6, 2024
be0855e
Merge branch 'main' into external-execution
nibanks Jan 4, 2025
919ae67
Updates to latest design
nibanks Jan 4, 2025
0e4b73d
Merge branch 'main' into external-execution
nibanks Jan 7, 2025
94b4c3f
wip
nibanks Jan 8, 2025
446a9f3
wip
nibanks Jan 22, 2025
f39e7f8
Merge branch 'main' into external-execution
nibanks Mar 17, 2025
b15ba57
Merge branch 'main' into external-execution
nibanks Mar 18, 2025
4a86d10
Merge branch 'main' into external-execution
nibanks Apr 10, 2025
bd22e7d
Merge branch 'main' into external-execution
nibanks Apr 17, 2025
c5b97f8
WIP
nibanks Apr 17, 2025
d2d209c
Merge branch 'main' into external-execution
nibanks Apr 28, 2025
3264294
Merge branch 'main' into external-execution
nibanks Apr 28, 2025
9603086
wip
nibanks Apr 28, 2025
a06aa81
renames
nibanks Apr 28, 2025
8c7c05b
Fix clog
nibanks Apr 28, 2025
8948a57
Merge branch 'main' into external-execution
nibanks Apr 30, 2025
2e1bf82
Merge branch 'main' into external-execution
nibanks May 6, 2025
74b8cb9
Fix clean up code
nibanks May 6, 2025
9ccc97d
fix builds
nibanks May 6, 2025
4cab2f5
fixes
nibanks May 6, 2025
7c4b651
Merge branch 'main' into external-execution
nibanks May 6, 2025
ad32894
Single thread test app works!
nibanks May 6, 2025
2039771
unreferenced parameters
nibanks May 6, 2025
a44b999
Fix kernel builds
nibanks May 6, 2025
da26362
another try for kernel mode
nibanks May 6, 2025
e5731f4
Trying to fix rust
nibanks May 6, 2025
38feb45
another try
nibanks May 6, 2025
fd4a623
Another try
nibanks May 6, 2025
630aa78
Another try
nibanks May 6, 2025
f387952
forgot one
nibanks May 6, 2025
cfefdf8
oops
nibanks May 6, 2025
2924b64
hack for mac
nibanks May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion docs/Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Each of these threads handle both the datapath (i.e., UDP) and QUIC layers by de
Using a single worker thread for both layers helps MsQuic can achieve lower latency and using separate threads for the two layers can help achieve higher throughput.
MsQuic aligns its processing logic with the rest of the networking stack (including hardware RSS) to ensure that all processing stays on the same NUMA node, and ideally, the same processor.

The complexity of aligning processing across various threads and processors is the primary reason for MsQuic to manage its own threading.
The complexity of aligning processing across various threads and processors is the primary reason for MsQuic to manage its own threading.
This provides developers with a performant abstraction of both functionality and threading model, which simplifies application development using MsQuic, ensuring that things "just work" efficiently for QUIC by default.

Each thread manages the execution of one or more connections.
Expand Down Expand Up @@ -116,6 +116,78 @@ graph TD
end
```

## Custom Execution

MsQuic also supports scenarios where the application layer creates the threads that MsQuic uses to execute on.
In this mode, the application creates one or more execution contexts that MsQuic will use to run all its internal logic.
The application is responsible for calling down into MsQuic to allow these execution contexts to run.

To create an execution context, the app much first create an event queue object, which is a platform specific type:

- Windows: IOCP
- Linux: epoll
- macOS: kqueue

On Windows, the following types are defined:

```c++
typedef HANDLE QUIC_EVENTQ;

typedef OVERLAPPED_ENTRY CXPLAT_CQE;

typedef
_IRQL_requires_max_(PASSIVE_LEVEL)
void
(CXPLAT_EVENT_COMPLETION)(
_In_ CXPLAT_CQE* Cqe
);
typedef CXPLAT_EVENT_COMPLETION *CXPLAT_EVENT_COMPLETION_HANDLER;

typedef struct CXPLAT_SQE {
OVERLAPPED Overlapped;
CXPLAT_EVENT_COMPLETION_HANDLER Completion;
} CXPLAT_SQE;
```

You will also notice the definiton for `QUIC_SQE` (SQE stands for submission queue entry), which defines the format that all completion events must take so they may be generically processed from the event queue (more on this below).

Once the app has the event queue, it may create the execution context with the `ExecutionCreate` function:

```c++
HANDLE IOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 1);
QUIC_EXECUTION_CONFIG ExecConfig = { 0, &IOCP };

QUIC_EXECUTION* ExecContext = nullptr;
QUIC_STATUS Status = MsQuic->ExecutionCreate(QUIC_GLOBAL_EXECUTION_CONFIG_FLAG_NONE, 0, 1, &ExecConfig, &ExecContext);
```

The above code createa a new IOCP (for Windows), sets up an execution config, indicating an ideal processor of 0 and the pointer to the IOCP, and then calls MsQuic to create 1 execution context.
An application may expand this code to create multiple execution contexts, depending on their needs.

To drive this execution context, the app will need to to periodically call `ExecutionPoll` and use the platform specific function to drain completion events from the event queue.

```c
bool AllDone = false;
while (!AllDone) {
uint32_t WaitTime = MsQuic->ExecutionPoll(ExecContext);

ULONG OverlappedCount = 0;
OVERLAPPED_ENTRY Overlapped[8];
if (GetQueuedCompletionStatusEx(IOCP, Overlapped, ARRAYSIZE(Overlapped), &OverlappedCount, WaitTime, FALSE)) {
for (ULONG i = 0; i < OverlappedCount; ++i) {
QUIC_SQE* Sqe = CONTAINING_RECORD(Overlapped[i].lpOverlapped, QUIC_SQE, Overlapped);
Sqe->Completion(&Overlapped[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can user quic callback event be invoked on this user thread here?
(I am thinking about if it is possible to write a single thread application, and potential deadlocks.)

Copy link
Member Author

@nibanks nibanks May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the app I added in this PR is single threaded. It works and I have the logs to prove it.

}
}
}
```

Above, you can see a simple loop that properly drives a single execution context on Windows.
`OVERLAPPED_ENTRY` objects received from `GetQueuedCompletionStatusEx` are used to get the submission queue entry and then call its completion handler.

In a real application, these completion events may come both from MsQuic and the application itself, therefore, this means **the application must use the same base format for its own submission entries**.
This is necessary to be able to share the same event queue object.

# See Also

[QUIC_STREAM_CALLBACK](api/QUIC_STREAM_CALLBACK.md)<br>
Expand Down
13 changes: 8 additions & 5 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ MsQuicConnectionClose(

CXPLAT_TEL_ASSERT(!Connection->State.HandleClosed);

if (IsWorkerThread) {
if (MsQuicLib.CustomExecutions || IsWorkerThread) {
//
// Execute this blocking API call inline if called on the worker thread.
//
Expand Down Expand Up @@ -749,7 +749,7 @@ MsQuicStreamClose(

CXPLAT_TEL_ASSERT(!Stream->Flags.HandleClosed);

if (IsWorkerThread) {
if (MsQuicLib.CustomExecutions || IsWorkerThread) {
//
// Execute this blocking API call inline if called on the worker thread.
//
Expand Down Expand Up @@ -959,7 +959,8 @@ MsQuicStreamShutdown(
QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);

if (Flags & QUIC_STREAM_SHUTDOWN_FLAG_INLINE &&
Connection->WorkerThreadID == CxPlatCurThreadID()) {
(MsQuicLib.CustomExecutions ||
Connection->WorkerThreadID == CxPlatCurThreadID())) {

CXPLAT_PASSIVE_CODE();

Expand Down Expand Up @@ -1613,7 +1614,8 @@ MsQuicSetParam(

QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);

if (Connection->WorkerThreadID == CxPlatCurThreadID()) {
if (MsQuicLib.CustomExecutions ||
Connection->WorkerThreadID == CxPlatCurThreadID()) {
//
// Execute this blocking API call inline if called on the worker thread.
//
Expand Down Expand Up @@ -1738,7 +1740,8 @@ MsQuicGetParam(

QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);

if (Connection->WorkerThreadID == CxPlatCurThreadID()) {
if (MsQuicLib.CustomExecutions ||
Connection->WorkerThreadID == CxPlatCurThreadID()) {
//
// Execute this blocking API call inline if called on the worker thread.
//
Expand Down
26 changes: 26 additions & 0 deletions src/core/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@

--*/

_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicExecutionCreate(
_In_ QUIC_GLOBAL_EXECUTION_CONFIG_FLAGS Flags, // Used for datapath type
_In_ uint32_t PollingIdleTimeoutUs,
_In_ uint32_t Count,
_In_reads_(Count) QUIC_EXECUTION_CONFIG* Configs,
_Out_writes_(Count) QUIC_EXECUTION** Executions
);

_IRQL_requires_max_(PASSIVE_LEVEL)
void
QUIC_API
MsQuicExecutionDelete(
_In_ uint32_t Count,
_In_reads_(Count) QUIC_EXECUTION** Executions
);

_IRQL_requires_max_(PASSIVE_LEVEL)
uint32_t
QUIC_API
MsQuicExecutionPoll(
_In_ QUIC_EXECUTION* Execution
);

_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QUIC_API
Expand Down
144 changes: 136 additions & 8 deletions src/core/library.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,15 @@
CXPLAT_FRE_ASSERT(MsQuicLib.PartitionCount > 0);

uint16_t* ProcessorList = NULL;
if (MsQuicLib.ExecutionConfig &&
#ifndef _KERNEL_MODE
if (MsQuicLib.WorkerPool != NULL) {
MsQuicLib.CustomPartitions = TRUE;
MsQuicLib.PartitionCount = (uint16_t)CxPlatWorkerPoolGetCount(MsQuicLib.WorkerPool);
} else if (

Check warning on line 145 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L143-L145

Added lines #L143 - L145 were not covered by tests
#else
if (
#endif
MsQuicLib.ExecutionConfig &&
MsQuicLib.ExecutionConfig->ProcessorCount &&
MsQuicLib.ExecutionConfig->ProcessorCount != MsQuicLib.PartitionCount) {
//
Expand Down Expand Up @@ -196,7 +204,14 @@
QuicPartitionInitialize(
&MsQuicLib.Partitions[i],
i,
#ifndef _KERNEL_MODE
ProcessorList ? ProcessorList[i] :
(MsQuicLib.CustomPartitions ?
(uint16_t)CxPlatWorkerPoolGetIdealProcessor(MsQuicLib.WorkerPool, i) :
i),
#else
ProcessorList ? ProcessorList[i] : i,
#endif
CXPLAT_HASH_SHA256,
ResetHashKey,
sizeof(ResetHashKey));
Expand Down Expand Up @@ -681,6 +696,7 @@
};

QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
BOOLEAN CreatedWorkerPool = FALSE;

if (AcquireLock) {
CxPlatLockAcquire(&MsQuicLib.Lock);
Expand All @@ -699,11 +715,14 @@
}

#ifndef _KERNEL_MODE
MsQuicLib.WorkerPool = CxPlatWorkerPoolCreate(MsQuicLib.ExecutionConfig);
if (!MsQuicLib.WorkerPool) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
MsQuicLibraryFreePartitions();
goto Exit;
if (MsQuicLib.WorkerPool == NULL) {
MsQuicLib.WorkerPool = CxPlatWorkerPoolCreate(MsQuicLib.ExecutionConfig);
if (!MsQuicLib.WorkerPool) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
MsQuicLibraryFreePartitions();
goto Exit;

Check warning on line 723 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L721-L723

Added lines #L721 - L723 were not covered by tests
}
CreatedWorkerPool = TRUE;
}
#endif

Expand All @@ -728,8 +747,10 @@
} else {
MsQuicLibraryFreePartitions();
#ifndef _KERNEL_MODE
CxPlatWorkerPoolDelete(MsQuicLib.WorkerPool);
MsQuicLib.WorkerPool = NULL;
if (CreatedWorkerPool) {
CxPlatWorkerPoolDelete(MsQuicLib.WorkerPool);
MsQuicLib.WorkerPool = NULL;

Check warning on line 752 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L750-L752

Added lines #L750 - L752 were not covered by tests
}
#endif
goto Exit;
}
Expand Down Expand Up @@ -1840,6 +1861,12 @@

Api->DatagramSend = MsQuicDatagramSend;

#ifndef _KERNEL_MODE
Api->ExecutionCreate = MsQuicExecutionCreate;
Api->ExecutionDelete = MsQuicExecutionDelete;
Api->ExecutionPoll = MsQuicExecutionPoll;
#endif

Api->ConnectionPoolCreate = MsQuicConnectionPoolCreate;

*QuicApi = Api;
Expand Down Expand Up @@ -2428,3 +2455,104 @@
}
return Status;
}

#ifndef _KERNEL_MODE

_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicExecutionCreate(
_In_ QUIC_GLOBAL_EXECUTION_CONFIG_FLAGS Flags, // Used for datapath type
_In_ uint32_t PollingIdleTimeoutUs,
_In_ uint32_t Count,
_In_reads_(Count) QUIC_EXECUTION_CONFIG* Configs,
_Out_writes_(Count) QUIC_EXECUTION** Executions
)
{
QuicTraceEvent(

Check warning on line 2472 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2471-L2472

Added lines #L2471 - L2472 were not covered by tests
ApiEnter,
"[ api] Enter %u (%p).",
QUIC_TRACE_API_EXECUTION_CREATE,
NULL);

QUIC_STATUS Status = QUIC_STATUS_SUCCESS;

Check warning on line 2478 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2478

Added line #L2478 was not covered by tests

UNREFERENCED_PARAMETER(Flags);
UNREFERENCED_PARAMETER(PollingIdleTimeoutUs);

if (MsQuicLib.LazyInitComplete) {

Check warning on line 2483 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2483

Added line #L2483 was not covered by tests
//
// Not allowed to change the execution config after we've already
// started running the library.
//
Status = QUIC_STATUS_INVALID_STATE;

Check warning on line 2488 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2488

Added line #L2488 was not covered by tests

} else {

Check warning on line 2490 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2490

Added line #L2490 was not covered by tests
//
// Clean up any previous worker pool and create a new one.
//
CxPlatWorkerPoolDelete(MsQuicLib.WorkerPool);
MsQuicLib.WorkerPool =

Check warning on line 2495 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2494-L2495

Added lines #L2494 - L2495 were not covered by tests
CxPlatWorkerPoolCreateExternal(Count, Configs, Executions);
if (MsQuicLib.WorkerPool == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;

Check warning on line 2498 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2497-L2498

Added lines #L2497 - L2498 were not covered by tests
}

MsQuicLib.CustomExecutions = TRUE;

Check warning on line 2501 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2501

Added line #L2501 was not covered by tests
}

QuicTraceEvent(

Check warning on line 2504 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2504

Added line #L2504 was not covered by tests
ApiExitStatus,
"[ api] Exit %u",
Status);

return Status;
}

Check warning on line 2510 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2509-L2510

Added lines #L2509 - L2510 were not covered by tests

_IRQL_requires_max_(PASSIVE_LEVEL)
void
QUIC_API
MsQuicExecutionDelete(
_In_ uint32_t Count,
_In_reads_(Count) QUIC_EXECUTION** Executions
)
{
QuicTraceEvent(

Check warning on line 2520 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2519-L2520

Added lines #L2519 - L2520 were not covered by tests
ApiEnter,
"[ api] Enter %u (%p).",
QUIC_TRACE_API_EXECUTION_DELETE,
NULL);

UNREFERENCED_PARAMETER(Count);
UNREFERENCED_PARAMETER(Executions);
CxPlatWorkerPoolDelete(MsQuicLib.WorkerPool);
MsQuicLib.WorkerPool = NULL;

Check warning on line 2529 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2528-L2529

Added lines #L2528 - L2529 were not covered by tests

QuicTraceEvent(

Check warning on line 2531 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2531

Added line #L2531 was not covered by tests
ApiExit,
"[ api] Exit");
}

Check warning on line 2534 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2534

Added line #L2534 was not covered by tests

_IRQL_requires_max_(PASSIVE_LEVEL)
uint32_t
QUIC_API
MsQuicExecutionPoll(
_In_ QUIC_EXECUTION* Execution
)
{
QuicTraceEvent(

Check warning on line 2543 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2542-L2543

Added lines #L2542 - L2543 were not covered by tests
ApiEnter,
"[ api] Enter %u (%p).",
QUIC_TRACE_API_EXECUTION_POLL,
NULL);

uint32_t Result = CxPlatWorkerPoolWorkerPoll(Execution);

Check warning on line 2549 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2549

Added line #L2549 was not covered by tests

QuicTraceEvent(

Check warning on line 2551 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2551

Added line #L2551 was not covered by tests
ApiExit,
"[ api] Exit");

return Result;
}

Check warning on line 2556 in src/core/library.c

View check run for this annotation

Codecov / codecov/patch

src/core/library.c#L2555-L2556

Added lines #L2555 - L2556 were not covered by tests

#endif
5 changes: 5 additions & 0 deletions src/core/library.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ typedef struct QUIC_LIBRARY {
//
BOOLEAN LazyInitComplete : 1;

//
// Indicates the app has configured their own execution contexts.
//
BOOLEAN CustomExecutions : 1;

//
// Indicates the app has configured non-default (per-processor) partitioning.
//
Expand Down
2 changes: 1 addition & 1 deletion src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ typedef enum QUIC_API_TYPE {
QUIC_API_TYPE_STRM_SEND,
QUIC_API_TYPE_STRM_RECV_COMPLETE,
QUIC_API_TYPE_STRM_RECV_SET_ENABLED,
QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS,

QUIC_API_TYPE_SET_PARAM,
QUIC_API_TYPE_GET_PARAM,

QUIC_API_TYPE_DATAGRAM_SEND,
QUIC_API_TYPE_CONN_COMPLETE_RESUMPTION_TICKET_VALIDATION,
QUIC_API_TYPE_CONN_COMPLETE_CERTIFICATE_VALIDATION,
QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS,

} QUIC_API_TYPE;

Expand Down
Loading
Loading