Skip to content

Commit 459dad1

Browse files
committed
u&.(m&{) failure in multithreads; gc() error on sparse result; space accounting on remote frees
1 parent 8ac9353 commit 459dad1

File tree

8 files changed

+67
-37
lines changed

8 files changed

+67
-37
lines changed

jsrc/ct.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ static I jtthreadcreate(J jt,I n){
533533
R 1;
534534
}
535535

536-
// execute the user's task. Result is an ordinary array or a pyx. Bivalent
536+
// execute the user's task. Result is an ordinary array or a pyx. Bivalent (a,w,self) or (w,self,self) called from unquote or parse
537537
static A jttaskrun(J jt,A arg1, A arg2, A arg3){A pyx;
538538
ARGCHK2(arg1,arg2); // the verb is not the issue
539539
RZ(pyx=jtcreatepyx(jt,-2,inf));
@@ -549,7 +549,7 @@ static A jttaskrun(J jt,A arg1, A arg2, A arg3){A pyx;
549549
// or if it is UNINCORPABLE (in which case we only need to clone the nonrecursive block). After that, ra() the arguments to protect them until the task completes.
550550
// It would be nice to be able to free the virtual before the task completes, but we don't have a way to (we could realize/fa in the worker, but why?). The virtual backer will be tied up during the task, but we
551551
// won't have to copy the data here and then transfer it in the task
552-
if(dyad){ra(arg3);} // arg3 is x/self, so never virtual; just ra
552+
ASSERT(ISDENSE(AT(arg1)),EVNONCE) if(dyad){ASSERT(ISDENSE(AT(arg2)),EVNONCE) ra(arg3);} // Don't allow sparse args since we can't box them; arg3 is self, so never virtual; just ra
553553
if(AFLAG(arg1)&AFVIRTUAL){if(AT(arg1)&TRAVERSIBLE)RZ(arg1=realize(arg1)) else if(AFLAG(arg1)&AFUNINCORPABLE)RZ(arg1=clonevirtual(arg1))} ra(arg1);
554554
if(AFLAG(arg2)&AFVIRTUAL){if(AT(arg2)&TRAVERSIBLE)RZ(arg2=realize(arg2)) else if(AFLAG(arg2)&AFUNINCORPABLE)RZ(arg2=clonevirtual(arg2))} ra(arg2);
555555
JOB *job=(JOB*)AAV1(jobA); // The job starts on the second cacheline of the A block. When we free the job we will have to back up to the A block

jsrc/cu.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,9 @@ static DF1(jtsunder){F1PREFIP;PROLOG(777);
339339
// we do that we have to remove any virtual blocks created here so that they don't raise y
340340
rifv(uz); // if uz is a virtual, realize it in case it is backed by y
341341
RZ(uz=EPILOGNORET(uz)); // free any virtual blocks we created
342-
if(negifipw<0)ACRESET(w,origacw) // usecount of y has been restored; restore inplaceability
342+
if((origacw&negifipw&(AC(w)-2))<0)ACRESET(w,origacw) // usecount of y has been restored; restore inplaceability. The use of origacw is subtle. In a multithreaded system you mustn't reset the usecount lest another thread
343+
// has raised it. So, we reset AC to ACINPLACE only in the case where it was originally inplaceable, because then we can be sure the same block is not in use in another thread.
344+
// Also, if AC(w) is above 1, it has escaped and must no longer be inplaced. If it isn't above 1, it must be confined to here
343345
// do the inverse
344346
if(FAV(v)->id==CCOMMA){RZ(z=reshape(shape(w),uz)); // inv for , is ($w)&($,)
345347
}else{RZ(z=jtamendn2(jtinplace,uz,w,FAV(v)->fgh[0],ds(CAMEND))); // inv for m&{ is m}&w

jsrc/jt.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ struct __attribute__((aligned(JTFLAGMSK+1))) JTTstruct {
220220
C fillv0[sizeof(Z)];/* default fill value */
221221
RNG *rngdata; // separately allocated block for RNG
222222
// seldom-used fields
223-
I malloctotalhwmk; // highest value since most recent 7!:1
223+
I filler5[1];
224224
// end of cacheline 5
225225

226226
C _cl6[0];
@@ -237,7 +237,7 @@ struct __attribute__((aligned(JTFLAGMSK+1))) JTTstruct {
237237
// perhaps something like an lru cache of threads recently freed to? Do a linear scan of the first k entries (maybe w/short simd if the first is a miss), and if they all miss, then fall back to--snmalloc trick, or sort buffer, or something else
238238
// Or maybe a fixed-size cache, and anything that falls out of it gets immediately flushed? I like that, because it helps prevent singleton allocations from getting lost
239239
DC sitop; /* pointer to top of SI stack */
240-
A *pmttop; // tstack top to free to when releasing the postmortem stack. Non0 indicates pm debugging session is active
240+
A *pmttop; // tstack top to free to when releasing the postmortem stack. Non0 indicates pm debugging session is active Could move to JST
241241
// end of cacheline 6
242242

243243
C _cl7[0];
@@ -246,7 +246,10 @@ struct __attribute__((aligned(JTFLAGMSK+1))) JTTstruct {
246246
I mfreegenallo; // Amount allocated through malloc, biased modified only by owning thread
247247
I malloctotal; // net total of malloc/free performed in m.c only modified only by owning thread
248248
UI cstackinit; // C stack pointer at beginning of execution
249-
I filler7[4];
249+
I mfreegenalloremote; // Amount allocated through malloc but freed by other threads (frees only, so always negative)
250+
I malloctotalremote; // net total of malloc/free performed in m.c only but freed by other threads (frees only, so always negative)
251+
I malloctotalhwmk; // highest value since most recent 7!:1
252+
I filler7[1];
250253
// end of cacheline 7
251254
C _cl8[0];
252255

@@ -343,8 +346,10 @@ typedef struct JSTstruct {
343346
#if MEMAUDIT & 2
344347
C audittstackdisabled; // set to 1 to disable auditing
345348
#endif
346-
// 2-3 byte free
349+
// 0-1 byte free
347350
// rest of cacheline used only in exceptional paths
351+
C oleop; /* com flag to capture output */
352+
UC cstacktype; /* cstackmin set during 0: jt init 1: passed in JSM 2: set in JDo */
348353
void *smpoll; /* re-used in wd */
349354
void *opbstr; /* com ptr to BSTR for captured output */
350355
I filler3[4];
@@ -411,14 +416,11 @@ typedef struct JSTstruct {
411416
US cachesizes[3]; // [0]: size of fastest cache [1]: size of largest cache private to each core [2]: size of largest cache shared by all cores, in multiples of 4KB
412417
C bx[11]; /* box drawing characters */
413418
UC disp[7]; // # different verb displays, followed by list thereof in order of display could be 15 bits
414-
C oleop; /* com flag to capture output */
415-
UC cstacktype; /* cstackmin set during 0: jt init 1: passed in JSM 2: set in JDo */
416-
// 6 bytes free
417419
#if PYXES || 1
418420
JOBQ (*jobqueue)[MAXTHREADPOOLS]; // one JOBQ block for each threadpool
419-
I filler7[1];
420-
#else
421421
I filler7[2];
422+
#else
423+
I filler7[3];
422424
#endif
423425
// end of cacheline 7
424426
C _cl8[0];

jsrc/m.c

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,10 @@ B jtspfree(J jt){I i;A p;
364364
// adding the bytes for those blocks to mfreebgenallo
365365
jt->mfreegenallo-=SBFREEB - (jt->memballo[i] & ~MFREEBCOUNTING); // subtract diff between current mfreeb[] and what it will be set to
366366
jt->memballo[i] = SBFREEB + (jt->memballo[i] & MFREEBCOUNTING); // set so we trigger rescan when we have allocated another SBFREEB bytes
367+
368+
// transfer bytes freed in other threads back to the totals for this thread
369+
I xfct=jt->malloctotalremote; jt->malloctotal+=xfct; __atomic_fetch_sub(&jt->malloctotalremote,xfct,__ATOMIC_ACQ_REL); // remote mods must be atomic
370+
xfct=jt->mfreegenalloremote; jt->mfreegenallo+=xfct; __atomic_fetch_sub(&jt->mfreegenalloremote,xfct,__ATOMIC_ACQ_REL); // remote mods must be atomic
367371
}
368372
}
369373
jt->uflags.spfreeneeded = 0; // indicate no check needed yet
@@ -471,17 +475,16 @@ F1(jtmmaxs){I j,m=MLEN,n;
471475
} /* 9!:21 space limit set */
472476

473477

474-
// Get total # bytes in use. That's total allocated so far, minus the bytes in the free lists and the blocks to be repatriated.
478+
// Get total # bytes in use in the current thread. That's total allocated so far, minus the bytes in the free lists and the blocks to be repatriated.
475479
// mfreeb[] is a negative count of blocks in the free list, and biased so the value goes negative
476-
// when garbage-collection is required. All non-pool allocations are accounted for in
477-
// mfreegenallo
480+
// when garbage-collection is required. All non-pool allocations are accounted for in mfreegenallo
478481
// At init, each mfreeb indicates SBFREEB bytes. mfreegenallo is negative to match that total,
479482
// indicating nothing has really been allocated; that's (PLIML-PMINL+1)*SBFREEB to begin with. When a block
480483
// is allocated, mfreeb[] increases; when a big block is allocated, mfreegenallo increases by the
481484
// amount of the allocation, and mfree[-PMINL+n] decreases by the amount in all the blocks that are now
482485
// on the free list.
483486
// At coalescing, mfreeb is set back to indicate SBFREEB bytes, and mfreegenallo is decreased by the amount of the setback.
484-
I jtspbytesinuse(J jt){I i,totalallo = jt->mfreegenallo&~MFREEBCOUNTING; // start with bias value
487+
I jtspbytesinuse(J jt){I i,totalallo = (jt->mfreegenallo&~MFREEBCOUNTING)+jt->mfreegenalloremote; // start with bias value
485488
if(jt->repatq)totalallo-=AC(jt->repatq); // bytes awaiting gc should not be considered inuse
486489
for(i=PMINL;i<=PLIML;++i){totalallo+=jt->memballo[-PMINL+i]&~MFREEBCOUNTING;} // add all the allocations
487490
R totalallo;
@@ -910,16 +913,19 @@ A jtgc(J jt,A w,A* old){
910913
// calls where w is the oldest thing on the tpush stack are not uncommon. In that case we don't need to do ra/tpop/fa/repair-inplacing. We can also save the repair if we KNOW w will be freed during the tpop
911914
A *pushp=jt->tnextpushp; // top of tstack
912915
if(old==pushp){if(AC(w)>=0){ra(w); tpush(w);} // if nothing to pop: (a) if inplaceable, make no change (value must be protected up the tstack); (b) otherwise protect the value on the tstack
913-
}else if(*old==w){ // does the start of tstack point to w?
914-
// w is the first element on the tstack. If it is the ONLY element, we can stand pat; no need to make w recursive
915-
if(old!=pushp-1){
916-
// there are other elements on tstack, we have to make w recursive because freeing one might otherwise delete contents of w. We can leave inplace status unchanged for w
917-
radescend(w); A *old1=old+1; if(likely(((UI)old1&(NTSTACKBLOCK-1))!=0))tpop(old1); else{*old=0; tpop(old); tpush(w);} // make w recursive; if we can back up to all but the first stack element, do that, leaving w on stack as before; otherwise reinstall
918-
} // raise descendants. Descendants were raised only when w turned from nonrecursive to recursive. Sparse w also descends, but always recurs in tpush
919-
}else if(((UI)REPSGN(AC(w))&(UI)AZAPLOC(w))>=(UI)old && likely((((UI)old^(UI)pushp)&-NTSTACKBLOCK)==0)){ // inplaceable zaploc>=old - but that is valid only when we know pushp and old are in the same stack block
920-
// We can see that w is abandoned and is about to be freed. Swap it with *old and proceed
921-
radescend(w); *AZAPLOC(w)=*old; *old=w; AZAPLOC(w)=old; tpop(old+1); // update ZAPLOC to point to new position in stack
916+
}else if(likely(ISDENSE(AT(w)))){ // sparse blocks cannot simply be left in *old because the contents are farther down the stack and would have to be protected too
917+
if(*old==w){ // does the start of tstack point to w?
918+
// w is the first element on the tstack. If it is the ONLY element, we can stand pat; no need to make w recursive
919+
if(old!=pushp-1){
920+
// there are other elements on tstack, we have to make w recursive because freeing one might otherwise delete contents of w. We can leave inplace status unchanged for w
921+
radescend(w); A *old1=old+1; if(likely(((UI)old1&(NTSTACKBLOCK-1))!=0))tpop(old1); else{*old=0; tpop(old); tpush(w);} // make w recursive; if we can back up to all but the first stack element, do that, leaving w on stack as before; otherwise reinstall
922+
} // raise descendants. Descendants were raised only when w turned from nonrecursive to recursive. Sparse w also descends, but always recurs in tpush
923+
}else if(((UI)REPSGN(AC(w))&(UI)AZAPLOC(w))>=(UI)old && likely((((UI)old^(UI)pushp)&-NTSTACKBLOCK)==0)){ // inplaceable zaploc>=old - but that is valid only when we know pushp and old are in the same stack block
924+
// We can see that w is abandoned and is about to be freed. Swap it with *old and proceed
925+
radescend(w); *AZAPLOC(w)=*old; *old=w; AZAPLOC(w)=old; tpop(old+1); // update ZAPLOC to point to new position in stack
926+
}else goto general; // no applicable special case, do the ra/tpop sequence
922927
}else{
928+
general:;
923929
// general case, w not freed or not abandoned
924930
ra(w); // protect w and its descendants from tpop; also converts w to recursive usecount (unless sparse).
925931
// if we are turning w to recursive, this is the last pass through all of w incrementing usecounts. All currently-on-stack pointers to blocks are compatible with the increment
@@ -1130,9 +1136,9 @@ A* jttg(J jt, A *pushp){ // Filling last slot; must allocate next page.
11301136

11311137
// back the tpush stack up to the previous allocation. We have just popped off the last element of the current allocation
11321138
// (that is, we have moved tnextpushp to the chain field at the start of the allocation)
1133-
// we keep one allocation in hand in tstacknext to avoid hysteresis. If there is one already there
1139+
// we keep one allocation in hand in tstacknext to avoid hysteresis. If there is one already there we free it
11341140
void freetstackallo(J jt){
1135-
if(jt->tstacknext){FREECHK(jt->tstacknext); __atomic_fetch_sub(&jt->malloctotal,NTSTACK+NTSTACKBLOCK,__ATOMIC_ACQ_REL);} // account for malloc'd memory
1141+
if(jt->tstacknext){FREECHK(jt->tstacknext); jt->malloctotal-=NTSTACK+NTSTACKBLOCK;} // account for malloc'd memory
11361142
// We will set the block we are vacating as the next-to-use. We keep only 1 such; if there is one already, free it
11371143
jt->tstacknext=jt->tstackcurr; // save the next-to-use, after removing bias
11381144
jt->tstackcurr=(A*)jt->tstackcurr[0]; // back up to the previous block
@@ -1239,7 +1245,8 @@ __attribute__((noinline)) A jtgafallopool(J jt){
12391245
ASSERT(av=MALLOC(PSIZE+TAILPAD),EVWSFULL);
12401246
#endif
12411247
I blockx=(I)jt&63; jt=(J)((I)jt&-64);
1242-
I nt=jt->malloctotal+=PSIZE+TAILPAD+ALIGNPOOLTOCACHE*CACHELINESIZE; // add to total JE mem allocated
1248+
jt->malloctotal+=PSIZE+TAILPAD+ALIGNPOOLTOCACHE*CACHELINESIZE; // add to total JE mem allocated
1249+
I nt=jt->malloctotalremote+jt->malloctotal; // get net total allocated from this thread & not freed
12431250
jt->mfreegenallo+=PSIZE+TAILPAD+ALIGNPOOLTOCACHE*CACHELINESIZE; // add to total from OS
12441251
{I ot=jt->malloctotalhwmk; ot=ot>nt?ot:nt; jt->malloctotalhwmk=ot;}
12451252
// split the allocation into blocks. Chain them together, and flag the base. We chain them in ascending order (the order doesn't matter), but
@@ -1280,7 +1287,8 @@ __attribute__((noinline)) A jtgafalloos(J jt,I blockx,I n){A z;
12801287
if(unlikely((((jt->mfreegenallo+=n)&MFREEBCOUNTING)!=0))){
12811288
I jtbytes=jt->bytes+=n; if(jtbytes>jt->bytesmax)jt->bytesmax=jtbytes;
12821289
}
1283-
I nt=jt->malloctotal+=n;
1290+
jt->malloctotal+=n; // add to our allocations
1291+
I nt=jt->malloctotalremote+jt->malloctotal; // get net total allocated from this thread & not freed
12841292
{I ot=jt->malloctotalhwmk; ot=ot>nt?ot:nt; jt->malloctotalhwmk=ot;}
12851293
A *tp=jt->tnextpushp; AZAPLOC(z)=tp; *tp++=z; jt->tnextpushp=tp; if(unlikely(((I)tp&(NTSTACKBLOCK-1))==0))RZ(z=jttgz(jt,tp,z)); // do the tpop/zaploc chaining
12861294
MOREINIT(z); // init allocating thread# and clear the lock
@@ -1553,11 +1561,22 @@ printf("%p-\n",w);
15531561
#endif
15541562
allocsize+=TAILPAD+ALIGNTOCACHE*CACHELINESIZE; // the actual allocation had a tail pad and boundary
15551563
#if PYXES
1556-
jt=JTFORTHREAD1(jt,w->origin); // for space accounting, switch to the thread the block came from *** this modifies jt ***
1557-
#endif
1564+
J jtremote=JTFORTHREAD1(jt,w->origin);
1565+
if(likely(jtremote==jt)){ // normal case of freeing in the allocating thread: avoid atomics
1566+
// obsolete jt=JTFORTHREAD1(jt,w->origin); // for space accounting, switch to the thread the block came from *** this modifies jt ***
1567+
jt->malloctotal-=allocsize;
1568+
jt->mfreegenallo-=allocsize; // account for all the bytes returned to the OS
1569+
}else{ // the block was allocate in another thread. Account for its free there
1570+
__atomic_fetch_sub(&jtremote->malloctotalremote,allocsize,__ATOMIC_ACQ_REL);
1571+
__atomic_fetch_sub(&jtremote->mfreegenalloremote,allocsize,__ATOMIC_ACQ_REL);
1572+
}
1573+
if(unlikely(jtremote->mfreegenallo&MFREEBCOUNTING))__atomic_fetch_sub(&jtremote->bytes,allocsize,__ATOMIC_ACQ_REL); // keep track of total allocation, needed only if enabled
1574+
#else
15581575
jt->malloctotal-=allocsize;
15591576
jt->mfreegenallo-=allocsize; // account for all the bytes returned to the OS
1560-
if(unlikely(jt->mfreegenallo&MFREEBCOUNTING))jt->bytes-=allocsize; // keep track of total allocation, needed only if enabled
1577+
if(unlikely(jt->mfreegenallo&MFREEBCOUNTING))__atomic_fetch_sub(&jt->bytes,allocsize,__ATOMIC_ACQ_REL); // keep track of total allocation, needed only if enabled
1578+
#endif
1579+
15611580
#if ALIGNTOCACHE
15621581
FREECHK(((I**)w)[-1]); // point to initial allocation and free it
15631582
#else

jsrc/sc.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ DF2(jtunquote){A z;
184184
jt->parserstackframe.sf=fs; // as part of starting the name we set the new recursion point
185185
// Execute the name. First check 4 flags at once to see if anything special is afoot: debug, pm, bstk, garbage collection
186186
if(likely(!(jt->uflags.ui4))) {
187-
// No special processing. Just run the entity
187+
// No special processing. Just run the entity as (a,w,self) or (w,self,self)
188188
// We preserve the XDEFMODIFIER flag in jtinplace, because the type of the exec must not have been changed by name lookup. Pass the other inplacing flags through if the call supports inplacing
189189
z=(*actionfn)((J)((I)jt+((FAV(fs)->flag&(flgd0cpC&FLGMONAD+FLGDYAD)?JTFLAGMSK:JTXDEFMODIFIER)&flgd0cpC)),a,w,fs); // keep MODIFIER flag always, and others too if verb supports it
190190
if(unlikely(z==0)){jteformat(jt,jt->parserstackframe.sf,a,w,0);} // make this a format point

jsrc/xt.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ F1(jtsp){ASSERTMTV(w); R sc(spbytesinuse());} // 7!:0
6969
// Return (current allo),(max since reset)
7070
// If arg is an atom, reset hwmk to it
7171
F1(jtsphwmk){
72-
I curr = jt->malloctotal; I hwmk = jt->malloctotalhwmk;
72+
I curr = jt->malloctotal+jt->malloctotalremote; I hwmk = jt->malloctotalhwmk;
7373
if(AN(w)){I new; RE(new=i0(w)); jt->malloctotalhwmk=new;}
7474
R v2(curr,hwmk);
7575
}

test/g320ip.ijs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -997,9 +997,9 @@ a =. 'this';'is'
997997

998998
NB. WILLOPEN through m&v u&n ~ hook
999999

1000-
a =. i. 1e6
1001-
10000 > 7!:2 '((i. 5) ,:"0 (2e5)) 3&({. #@>)@:(<;.0) a'
1002-
10000 > 7!:2 '((i. 5) ,:"0 (2e5)) ({. #@>)~&3@:(<;.0) a'
1000+
a =: i. 1e6
1001+
10000 > 7!:2 '((i. 5) ,:"0 (2e5)) 3&({. #@>)@:(<;.0) a'
1002+
10000 > 7!:2 '((i. 5) ,:"0 (2e5)) ({. #@>)~&3@:(<;.0) a'
10031003

10041004

10051005
NB. Inplacing forks

test/gsp520sd.ijs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ f&> c [ q=: 2 3$?2e6
1818
f&> c [ q=: 2 1 3$o.?2e6
1919
f&> c [ q=: 2 1 1 3$j./?2$2e6
2020

21+
1000 > {{ a =. 500 ?@$ 100
22+
b =. 550 ?@$ 100
23+
sp =. $. 110 100 ?@$ 2
24+
stsp =. 7!:0''
25+
for. i. 20 do. (<a;b) { sp end.
26+
stsp -~ 7!:0'' }} '' NB. At one point this leaked memory
27+
2128
'nonce error' -: ($.i.2 3) { etx 'abcdef'
2229
'nonce error' -: ($.i.2 3) { etx u:'abcdef'
2330
'nonce error' -: ($.i.2 3) { etx 10&u:'abcdef'

0 commit comments

Comments
 (0)