Skip to content

Commit 707463a

Browse files
authored
Merge pull request #1575 from pyrmont/feature.ev-interrupt2
Expand scope of code that works with `ev/deadline` again
2 parents 73334f3 + eac37ab commit 707463a

File tree

3 files changed

+117
-12
lines changed

3 files changed

+117
-12
lines changed

src/core/ev.c

Lines changed: 102 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ typedef struct {
112112
JanetHandle write_pipe;
113113
} JanetEVThreadInit;
114114

115+
/* Structure used to initialize threads that run timeouts */
116+
typedef struct {
117+
double sec;
118+
JanetVM *vm;
119+
JanetFiber *fiber;
120+
} JanetThreadedTimeout;
121+
115122
#define JANET_MAX_Q_CAPACITY 0x7FFFFFF
116123

117124
static void janet_q_init(JanetQueue *q) {
@@ -623,6 +630,7 @@ void janet_addtimeout(double sec) {
623630
to.curr_fiber = NULL;
624631
to.sched_id = fiber->sched_id;
625632
to.is_error = 1;
633+
to.has_worker = 0;
626634
add_timeout(to);
627635
}
628636

@@ -635,9 +643,54 @@ void janet_addtimeout_nil(double sec) {
635643
to.curr_fiber = NULL;
636644
to.sched_id = fiber->sched_id;
637645
to.is_error = 0;
646+
to.has_worker = 0;
638647
add_timeout(to);
639648
}
640649

650+
#ifdef JANET_WINDOWS
651+
static VOID CALLBACK janet_timeout_stop(ULONG_PTR ptr) {
652+
UNREFERENCED_PARAMETER(ptr);
653+
ExitThread(0);
654+
}
655+
#endif
656+
657+
static void janet_timeout_cb(JanetEVGenericMessage msg) {
658+
(void) msg;
659+
janet_interpreter_interrupt_handled(&janet_vm);
660+
}
661+
662+
#ifdef JANET_WINDOWS
663+
static DWORD WINAPI janet_timeout_body(LPVOID ptr) {
664+
JanetThreadedTimeout tto = *(JanetThreadedTimeout *)ptr;
665+
janet_free(ptr);
666+
SleepEx((DWORD)(tto.sec * 1000), TRUE);
667+
if (janet_fiber_can_resume(tto.fiber)) {
668+
janet_interpreter_interrupt(tto.vm);
669+
JanetEVGenericMessage msg = {0};
670+
janet_ev_post_event(tto.vm, janet_timeout_cb, msg);
671+
}
672+
return 0;
673+
}
674+
#else
675+
static void *janet_timeout_body(void *ptr) {
676+
JanetThreadedTimeout tto = *(JanetThreadedTimeout *)ptr;
677+
janet_free(ptr);
678+
struct timespec ts;
679+
ts.tv_sec = (time_t) tto.sec;
680+
ts.tv_nsec = (tto.sec <= UINT32_MAX)
681+
? (long)((tto.sec - ((uint32_t)tto.sec)) * 1000000000)
682+
: 0;
683+
nanosleep(&ts, &ts);
684+
if (janet_fiber_can_resume(tto.fiber)) {
685+
janet_interpreter_interrupt(tto.vm);
686+
JanetEVGenericMessage msg = {0};
687+
janet_ev_post_event(tto.vm, janet_timeout_cb, msg);
688+
}
689+
return NULL;
690+
}
691+
#endif
692+
693+
641694
void janet_ev_inc_refcount(void) {
642695
janet_atomic_inc(&janet_vm.listener_count);
643696
}
@@ -1431,6 +1484,17 @@ JanetFiber *janet_loop1(void) {
14311484
while ((has_timeout = peek_timeout(&to))) {
14321485
if (to.curr_fiber != NULL) {
14331486
if (!janet_fiber_can_resume(to.curr_fiber)) {
1487+
if (to.has_worker) {
1488+
#ifdef JANET_WINDOWS
1489+
QueueUserAPC(janet_timeout_stop, to.worker, 0);
1490+
WaitForSingleObject(to.worker, INFINITE);
1491+
CloseHandle(to.worker);
1492+
#else
1493+
pthread_cancel(to.worker);
1494+
void *res;
1495+
pthread_join(to.worker, &res);
1496+
#endif
1497+
}
14341498
janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber));
14351499
pop_timeout(0);
14361500
continue;
@@ -3103,26 +3167,53 @@ JANET_CORE_FN(cfun_ev_sleep,
31033167
}
31043168

31053169
JANET_CORE_FN(cfun_ev_deadline,
3106-
"(ev/deadline sec &opt tocancel tocheck)",
3107-
"Schedules the event loop to try to cancel the `tocancel` "
3108-
"task as with `ev/cancel`. After `sec` seconds, the event "
3109-
"loop will attempt cancellation of `tocancel` if the "
3110-
"`tocheck` fiber is resumable. `sec` is a number that can "
3111-
"have a fractional part. `tocancel` defaults to "
3112-
"`(fiber/root)`, but if specified, must be a task (root "
3113-
"fiber). `tocheck` defaults to `(fiber/current)`, but if "
3114-
"specified, should be a fiber. Returns `tocancel` "
3115-
"immediately.") {
3116-
janet_arity(argc, 1, 3);
3170+
"(ev/deadline sec &opt tocancel tocheck intr?)",
3171+
"Schedules the event loop to try to cancel the `tocancel` task as with `ev/cancel`. "
3172+
"After `sec` seconds, the event loop will attempt cancellation of `tocancel` if the "
3173+
"`tocheck` fiber is resumable. `sec` is a number that can have a fractional part. "
3174+
"`tocancel` defaults to `(fiber/root)`, but if specified, must be a task (root "
3175+
"fiber). `tocheck` defaults to `(fiber/current)`, but if specified, must be a fiber. "
3176+
"Returns `tocancel` immediately. If `interrupt?` is set to true, will create a "
3177+
"background thread to try to interrupt the VM if the timeout expires.") {
3178+
janet_arity(argc, 1, 4);
31173179
double sec = janet_getnumber(argv, 0);
3180+
sec = (sec < 0) ? 0 : sec;
31183181
JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm.root_fiber);
31193182
JanetFiber *tocheck = janet_optfiber(argv, argc, 2, janet_vm.fiber);
3183+
int use_interrupt = janet_optboolean(argv, argc, 3, 0);
31203184
JanetTimeout to;
31213185
to.when = ts_delta(ts_now(), sec);
31223186
to.fiber = tocancel;
31233187
to.curr_fiber = tocheck;
31243188
to.is_error = 0;
31253189
to.sched_id = to.fiber->sched_id;
3190+
if (use_interrupt) {
3191+
JanetThreadedTimeout *tto = janet_malloc(sizeof(JanetThreadedTimeout));
3192+
if (NULL == tto) {
3193+
JANET_OUT_OF_MEMORY;
3194+
}
3195+
tto->sec = sec;
3196+
tto->vm = &janet_vm;
3197+
tto->fiber = tocheck;
3198+
#ifdef JANET_WINDOWS
3199+
HANDLE worker = CreateThread(NULL, 0, janet_timeout_body, tto, 0, NULL);
3200+
if (NULL == worker) {
3201+
janet_free(tto);
3202+
janet_panic("failed to create thread");
3203+
}
3204+
#else
3205+
pthread_t worker;
3206+
int err = pthread_create(&worker, NULL, janet_timeout_body, tto);
3207+
if (err) {
3208+
janet_free(tto);
3209+
janet_panicf("%s", janet_strerror(err));
3210+
}
3211+
#endif
3212+
to.has_worker = 1;
3213+
to.worker = worker;
3214+
} else {
3215+
to.has_worker = 0;
3216+
}
31263217
add_timeout(to);
31273218
return janet_wrap_fiber(tocancel);
31283219
}

src/core/state.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
#include <stdint.h>
2828

2929
#ifdef JANET_EV
30-
#ifndef JANET_WINDOWS
30+
#ifdef JANET_WINDOWS
31+
#include <windows.h>
32+
#else
3133
#include <pthread.h>
3234
#endif
3335
#endif
@@ -53,13 +55,21 @@ typedef struct {
5355
void *data;
5456
} JanetQueue;
5557

58+
#ifdef JANET_EV
5659
typedef struct {
5760
JanetTimestamp when;
5861
JanetFiber *fiber;
5962
JanetFiber *curr_fiber;
6063
uint32_t sched_id;
6164
int is_error;
65+
int has_worker;
66+
#ifdef JANET_WINDOWS
67+
HANDLE worker;
68+
#else
69+
pthread_t worker;
70+
#endif
6271
} JanetTimeout;
72+
#endif
6373

6474
/* Registry table for C functions - contains metadata that can
6575
* be looked up by cfunction pointer. All strings here are pointing to

test/suite-ev.janet

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,4 +550,8 @@
550550
(ev/sleep 0.15)
551551
(assert (not terminated-normally) "early termination failure 3"))
552552

553+
(let [f (coro (forever :foo))]
554+
(ev/deadline 0.01 nil f true)
555+
(assert-error "deadline expired" (resume f)))
556+
553557
(end-suite)

0 commit comments

Comments
 (0)