Skip to content

Commit f92242f

Browse files
committed
interrupt calls wakeall--immediately, not post hoc; hopefully now robust even on windows where there is no EINTR
1 parent 4265654 commit f92242f

File tree

10 files changed

+47
-26
lines changed

10 files changed

+47
-26
lines changed

jsrc/ct.c

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,13 @@ typedef struct pyxcondmutex{
182182
S pyxorigthread; // thread number that is working on this pyx, or _1 if the value is available
183183
C errcode; // 0 if no error, or error code
184184
#if PYXES
185-
UI4 state;//one of the below pyx states. Monotonically increases
185+
UI4 state;//one of the below pyx states
186186
#endif
187187
} PYXBLOK;
188188
enum{ // pyx state is low 2 bytes of state. High 2 bytes are the wakeup sequence number
189189
PYXEMPTY=0, //the pyx is not filled in, and no one is waiting
190-
PYXWAIT=3, //at least 1 thread is waiting, and the pyx is not filled in
191-
PYXFULL=1}; //the pyx is filled in. We can OR FULL into pyx state to move to FULL state
190+
PYXWAIT=3, //at least 1 thread is waiting, and the pyx is not filled in. We can OR WAIT into pyx state to move to WAIT state
191+
PYXFULL=1}; //the pyx is filled in
192192
#if PYXES
193193

194194
// Install a value/errcode into a (recursive) pyx, and broadcast to anyone waiting on it. fa() the pyx to indicate that the thread has released the pyx
@@ -221,15 +221,15 @@ static A jtcreatepyx(J jt, I thread,D timeout){A pyx;
221221
// EVTIME if timeout
222222
A jtpyxval(J jt,A pyx){ UI4 state;PYXBLOK *blok=(PYXBLOK*)AAV0(pyx);
223223
if(PYXFULL==(state=lda((US*)&blok->state)))goto done; // if pyx already full, return result
224-
casa((US*)&blok->state,&(US){PYXEMPTY},PYXWAIT); // if pyx is EMPTY, move it to WAIT
224+
{US dummy=0;casa(state!=PYXEMPTY?&dummy:(US*)&blok->state,&(US){PYXEMPTY},PYXWAIT);} // if pyx is EMPTY, move it to WAIT. Avoid excess contention on hot pyxes
225225
UI ns=({D mwt=blok->pyxmaxwt;mwt==inf?IMAX:(I)(mwt*1e9);}); // figure out how long to wait
226226
struct jtimespec end=jtmtil(ns); // get the time when we have to give up on this pyx
227227
I err;
228-
sta(&jt->futexwt,&blok->state); // make sure systemlock knows how to wake us up. We check for system events AFTER this store, but before the wait (but wakeall has a window so it is called repeatedly anyway)
228+
sta(&jt->futexwt,&blok->state); // make sure systemlock knows how to wake us up. We check for system events AFTER this store, but before the wait
229229
while(1){ // repeat till state goes to FULL
230-
if(lda((US*)&blok->state)==PYXFULL)break; // if pyx was filled, exit and return its value
231230
// The wait may time out because of a pending system action (BREAK or system lock). If so, we accept it now...
232231
UI4 state=lda(&blok->state); C breakb; // get store sequence # before we check for system event
232+
if(PYXFULL==(state&0xffff))break; // if pyx was filled, exit and return its value
233233
if(unlikely(BETWEENC(lda(&JT(jt,systemlock)),1,2))){jtsystemlockaccept(jt,LOCKALL);} // process systemlock and keep waiting
234234
// the user may be requesting a BREAK interrupt for deadlock or other slow execution
235235
if(unlikely((breakb=lda(&JT(jt,adbreak)[0])))!=0){err=breakb==1?EVATTN:EVBREAK;goto fail;} // JBREAK: give up on the pyx and exit
@@ -238,13 +238,12 @@ A jtpyxval(J jt,A pyx){ UI4 state;PYXBLOK *blok=(PYXBLOK*)AAV0(pyx);
238238
else{err=EVTIME;goto fail;}} // otherwise, timeout, fail the pyx and exit
239239
I wr=jfutex_waitn(&blok->state,state|PYXWAIT,ns);if(unlikely(wr>0)){err=EVFACE;goto fail;} // wait on futex. If new event# or state has moved off of WAIT, don't wait
240240
}
241-
sta(&jt->futexwt,0); while(lda(&JT(jt,wakeallct)))YIELD; // wait till pending wakealls complete before we allow this block to be deleted
241+
CLRFUTEXWT; // wait till pending wakealls complete before we allow this block to be deleted
242242
done: // pyx has been filled in. jt->futexwt must be 0
243243
if(likely(blok->pyxvalue!=NULL))R blok->pyxvalue; // valid value, use it
244244
ASSERT(0,blok->errcode); // if error, return the error code
245245
fail:
246-
sta(&jt->futexwt,0); while(lda(&JT(jt,wakeallct)))YIELD;
247-
ASSERT(0,err);}
246+
CLRFUTEXWT;ASSERT(0,err);}
248247

249248
// ************************************* Locks **************************************
250249
// take a readlock on *alock. We come here only if a writelock was requested or running. We have incremented the readlock

jsrc/d.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ F1(jtdbstackz){A y,z;
205205
// explicit errors also come here, so stop here to see the point at which error was detected
206206
static void jtjsigstr(J jt,I e,I n,C*s){
207207
if(jt->jerr){jt->curname=0; R;} // if not first error, ignore: clear error-name and continue
208-
if(e==EVATTN||e==EVBREAK)wakeall(jt); // if user break, interrupt all threads that are in wait loops, so they can exit
209208
jt->jerr=(C)e; jt->jerr1=(C)e; if(jt->etxn<0)R; // remember error for testing, but if the error line is frozen, don't touch it
210209
if(e!=EVSTOP)moveparseinfotosi(jt); jt->etxn=0; // before we display, move error info from parse variables to si; but if STOP, it's already installed
211210
dhead(0,0L); // | left-header for the error line

jsrc/io.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ jfe routines (jconsole.c):
3333
je routines (io.c):
3434
jdo(line)
3535
36+
jinterrupt() - signal interrupt in all threads. Safe to call from signal handlers, and even from non-j threads, but shouldn't be called concurrently
37+
3638
jsto(type,string) - JT(jt,smoutput)() to give jfe output
3739
3840
jgets() - JT(jt,sminput)() callback to get jfe kb input
@@ -663,6 +665,15 @@ A _stdcall Jga(JS jjt, I t, I n, I r, I*s){A z;
663665
return z;
664666
}
665667

668+
void _stdcall JInterrupt(JS jt){
669+
SETJTJM(jt,jt,jm);
670+
// increment adbreak by 1, capping at 2
671+
C old=lda(jt->adbreak);
672+
while(1){
673+
if(old>=2)break;
674+
if(casa(jt->adbreak,&old,1+old))break;}
675+
wakeall(jm);}
676+
666677
void oleoutput(JS jt, I n, char* s); /* SY_WIN32 only */
667678

668679
#define capturesize 80000

jsrc/jconsole.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@
2929
static int forceprmpt=0; /* emit prompt even if isatty is false */
3030
static int breadline=0; /* 0: none 1: libedit 2: linenoise */
3131
static int norl=0; /* disable readline/linenoise */
32-
static char **adadbreak;
33-
static void sigint(int k){**adadbreak+=1;signal(SIGINT,sigint);}
34-
static void sigint2(int k){**adadbreak+=1;}
32+
static void sigint(int k){jeinterrupt();signal(SIGINT,sigint);}
33+
static void sigint2(int k){jeinterrupt();}
3534
static char input[30000];
3635

3736
#if defined(ANDROID) || defined(_WIN32)
@@ -155,7 +154,7 @@ char* Jinput_stdio(char* prompt)
155154
if(!(forceprmpt||_isatty(_fileno(stdin)))) return "2!:55''";
156155
fputs("\n",stdout);
157156
fflush(stdout);
158-
**adadbreak+=1;
157+
jeinterrupt();
159158
#else
160159
/* unix eof without readline */
161160
return "2!:55''";
@@ -276,7 +275,6 @@ int main(int argc, char* argv[])
276275

277276
jt=jeload(callbacks);
278277
if(!jt){char m[1000]; jefail(m); fputs(m,stderr); exit(1);}
279-
adadbreak=(char**)jt; // first address in jt is address of breakdata
280278
#ifndef _WIN32
281279
if(2==breadline){
282280
struct sigaction sa;

jsrc/jeload.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
static void* hjdll;
3737
static JST* jt;
3838
static JDoType jdo;
39+
static JInterruptType jinterrupt;
3940
static JFreeType jfree;
4041
static JgaType jga;
4142
static JGetLocaleType jgetlocale;
@@ -84,6 +85,11 @@ int jedo(char* sentence)
8485
return jdo(jt,(C*)sentence);
8586
}
8687

88+
void jeinterrupt()
89+
{
90+
jinterrupt(jt);
91+
}
92+
8793
A jegeta(I n, char* s){return jgeta(jt,n,(C*)s);}
8894
I jeseta(I n,char* name,I x,char* d){return jseta(jt,n,(C*)name,x,(C*)d);}
8995
void jefree(){jfree(jt);}
@@ -106,6 +112,7 @@ JST* jeload(void* callbacks)
106112
if(!jt) return 0;
107113
((JSMType)GETPROCADDRESS(hjdll,"JSM"))(jt,callbacks);
108114
jdo=(JDoType)GETPROCADDRESS(hjdll,"JDo");
115+
jinterrupt=(JInterruptType)GETPROCADDRESS(hjdll,"JInterrupt");
109116
jfree=(JFreeType)GETPROCADDRESS(hjdll,"JFree");
110117
jga=(JgaType)GETPROCADDRESS(hjdll,"Jga");
111118
jgetlocale=(JGetLocaleType)GETPROCADDRESS(hjdll,"JGetLocale");

jsrc/jeload.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ int jefirst(int,char*);
77
A jegeta(int,char*);
88
I jeseta(I n,char* name,I x,char* d);
99
int jedo(char*);
10+
void jeinterrupt();
1011
void jefree();
1112
char* jegetlocale();
1213
void jefail(char*);

jsrc/jfex.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626
#include "jlib.h"
2727

2828
static JDoType jdo;
29+
static JInterruptType jinterrupt;
2930
static JFreeType jfree;
3031
static JgaType jga;
3132
static JGetLocaleType jgetlocale;
3233

3334
static J jt;
3435
static void* hjdll;
3536

36-
static char **adadbreak;
37-
static void sigint(int k){**adadbreak+=1;signal(SIGINT,sigint);}
37+
static void sigint(int k){jinterrupt(jt);}
3838
static char input[1000];
3939

4040
// J calls for input (debug suspension and 1!:1[1) and we call for input
@@ -44,7 +44,7 @@ char* _stdcall Jinput(J jt,char* prompt)
4444
if(!fgets(input, sizeof(input), stdin))
4545
{
4646
fputs("\n",stdout);
47-
**adadbreak+=1;
47+
jinterrupt(jt);
4848
}
4949
return input;
5050
}
@@ -144,10 +144,10 @@ int main(int argc, char* argv[])
144144
if(!jt) return 1; // JE init failed
145145
((JSMType)GETPROCADDRESS(hjdll,"JSM"))(jt,callbacks);
146146
jdo=(JDoType)GETPROCADDRESS(hjdll,"JDo");
147+
jinterrupt=(JInterruptType)GETPROCADDRESS(hjdll,"JInterrupt");
147148
jfree=(JFreeType)GETPROCADDRESS(hjdll,"JFree");
148149
jga=(JgaType)GETPROCADDRESS(hjdll,"Jga");
149150
jgetlocale=(JGetLocaleType)GETPROCADDRESS(hjdll,"JGetLocale");
150-
adadbreak=(char**)jt; // first address in jt is address of breakdata
151151
signal(SIGINT,sigint);
152152
while(1){jdo(jt,Jinput(jt," "));}
153153
jfree(jt);

jsrc/jlib.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ CDPROC JS _stdcall JInit(void); /* init instance */
1111
CDPROC void _stdcall JSM(JS jt, void*callbacks[]); /* set callbacks */
1212
CDPROC void _stdcall JSMX(JS jt, void*, void*, void*, void*, I);
1313
CDPROC int _stdcall JDo(JS jt,C*); /* run sentence */
14+
CDPROC void _stdcall JInterrupt(JS jt); /* signal interrupt */
1415
CDPROC C* _stdcall JGetLocale(JS jt); /* get locale */
1516
CDPROC A _stdcall Jga(JS jt, I t, I n, I r, I*s);
1617
CDPROC int _stdcall JFree(JS jt); /* free instance */
@@ -23,6 +24,7 @@ CDPROC int _stdcall JErrorTextM(JS jt, I ec, I* p);
2324

2425
typedef void* (_stdcall *JInitType) ();
2526
typedef int (_stdcall *JDoType) (JS, C*);
27+
typedef void (_stdcall *JInterruptType)(JS);
2628
typedef C* (_stdcall *JGetLocaleType)(JS);
2729
typedef void (_stdcall *JSMType) (JS, void*);
2830
typedef void (_stdcall *JSMXType) (JS, void*, void*, void*, void*, I);

jsrc/mt.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ C jtpthread_mutex_lock(J jt,jtpthread_mutex_t *m,I self){ //lock m; self is thre
156156
//a couple of alternatives suggest themselves: store up to k waiters (FREE/LOCK/WAIT is really 0/1/n; we could do eg 0/1/2/3/n); store the waiter count somehow outside of the value
157157
// Before waiting, handle system events if present
158158
UI4 waitval=lda(&m->v); C breakb; // get the serial number before we check. Must be atomic; this is supposed to synchronise with writes to the same location via futexwt by wakeall
159-
if(unlikely(BETWEENC(lda(&JT(jt,systemlock)),1,2))){jtsystemlockaccept(jt,LOCKALL);} // if system lock requested, accept it
159+
if(unlikely(BETWEENC(lda(&JT(jt,systemlock)),1,2))){jtsystemlockaccept(jt,LOCKALL);continue;} // if system lock requested, accept it. Then retry; a lot can happen during a systemlock, and we probably need to resample waitval anyway
160160
// the user may be requesting a BREAK interrupt for deadlock or other slow execution
161161
if(unlikely((breakb=lda(&JT(jt,adbreak)[0])))!=0){r=breakb==1?EVATTN:EVBREAK;goto fail;} // JBREAK: give up on the pyx and exit
162162
// Now wait for a change. The futex_wait is atomic, and will wait only if the state is WAIT. In that case,
@@ -170,10 +170,10 @@ C jtpthread_mutex_lock(J jt,jtpthread_mutex_t *m,I self){ //lock m; self is thre
170170
if(unlikely(i>0)){r=EVFACE; goto fail;} //handle error (unaligned unmapped interrupted...)
171171
}
172172
// come out of loop when we have the lock
173-
clrfutexwt(jt);
173+
CLRFUTEXWT;
174174
}
175175
m->ct+=m->recursive;m->owner=self;R 0; // install ownership info, good return
176-
fail:clrfutexwt(jt); R r;} // error return, with our internal errorcode
176+
fail:CLRFUTEXWT; R r;} // error return, with our internal errorcode
177177

178178

179179
// return positive error code, 0 if got lock, -1 if lock timed out
@@ -185,17 +185,18 @@ I jtpthread_mutex_timedlock(J jt,jtpthread_mutex_t *m,UI ns,I self){ //lock m, w
185185
sta(&jt->futexwt,&m->v); //ensure other threads know how to wake us up for systemlock
186186
while(xchga((US*)&m->v,WAIT)!=FREE){ //exit when _we_ successfully installed WAIT in place of FREE
187187
UI4 waitval=lda(&m->v); C breakb; // get the serial number before we check. Must be atomic; this is supposed to synchronise with writes to the same location via futexwt by wakeall
188-
if(unlikely(BETWEENC(lda(&JT(jt,systemlock)),1,2))){jtsystemlockaccept(jt,LOCKALL);}
188+
if(unlikely(BETWEENC(lda(&JT(jt,systemlock)),1,2))){jtsystemlockaccept(jt,LOCKALL);goto retime;}
189189
if(unlikely((breakb=lda(&JT(jt,adbreak)[0])))!=0){r=breakb==1?EVATTN:EVBREAK;goto fail;} // JBREAK: give up on the pyx and exit
190190
I i=jfutex_waitn(&m->v,waitval|WAIT,ns);
191191
if(unlikely(i>0)){r=EVFACE; goto fail;} //handle error (unaligned unmapped interrupted...)
192192
if(i==-1){r=-1;goto fail;} //if the kernel says we timed out, trust it rather than doing another syscall to check the time
193+
retime:
193194
if(-1ull==(ns=jtmdif(tgt))){r=-1;goto fail;} //update delta, abort if timed out
194195
}
195-
clrfutexwt(jt); // remove wakeup to this thread
196+
CLRFUTEXWT; // remove wakeup to this thread
196197
}
197198
m->ct+=m->recursive;m->owner=self;R 0; // install ownership info, good return
198-
fail:clrfutexwt(jt); R r;} // error return, with our internal errorcode or -1 if timeout
199+
fail:CLRFUTEXWT; R r;} // error return, with our internal errorcode or -1 if timeout
199200

200201
I jtpthread_mutex_trylock(jtpthread_mutex_t *m,I self){ //attempt to acquire m
201202
if(casa((US*)&m->v,&(US){FREE},LOCK)){m->ct+=m->recursive;m->owner=self;R 0;} //fastpath: attempt to acquire the lock; if free, take it
@@ -226,6 +227,6 @@ C jtjsleep(J jt,UI ns){
226227
if(i==-1){r=0;break;} //timed out
227228
retime:
228229
if(-1ull==(ns=jtmdif(tgt))){r=0;break;}}
229-
clrfutexwt(jt);
230+
CLRFUTEXWT;
230231
R r;}
231232
#endif //PYXES

jsrc/mt.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,6 @@ extern int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
7474
#error no futex support for your platform
7575
#endif //_WIN32
7676
#endif //PYXES
77+
78+
// remove wakeup to this thread; if wakeup in progress, wait till it finishes
79+
#define CLRFUTEXWT {sta(&jt->futexwt,0); while(lda(&JT(jt,wakeallct)))YIELD;}

0 commit comments

Comments
 (0)