Skip to content

Commit 38c328e

Browse files
committed
batchopndx matches batchop
1 parent 2dfd0a9 commit 38c328e

File tree

3 files changed

+76
-56
lines changed

3 files changed

+76
-56
lines changed

jsrc/j.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ struct jtimespec jmtfclk(void); //'fast clock'; maybe less inaccurate; intended
713713
#if SY_64
714714
#define BW 64 /* # bits in a word */
715715
#define LGSZI 3 // lg(#bytes in an I)
716+
#define LGSZE 4 // lg(#bytes in an E)
716717
#else
717718
#define BW 32
718719
#define LGSZI 2

jsrc/vfrom.c

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3329,12 +3329,6 @@ struct __attribute__((aligned(CACHELINESIZE))) bopctx {
33293329
US nofsts; // index to the offset pointer after the last offset/val has been processed
33303330
} opinfo; // info for each op
33313331

3332-
typedef struct { // scaf perhaps separate these to allow indexed fetch
3333-
__m256d *qktbase; // offset in bytes between start of ring row and corresponding value in Qkt
3334-
UI8 rowmask; // mask of all the resultblocks that have been modified in this line
3335-
} releaseinfo;
3336-
3337-
33383332
// the processing loop for one core. We take groups of rows, in order
33393333
static unsigned char jtbatchopx(J jt,struct bopctx* const ctx,UI4 ti){
33403334
// transfer everything out of ctx into local names
@@ -3350,12 +3344,13 @@ static unsigned char jtbatchopx(J jt,struct bopctx* const ctx,UI4 ti){
33503344

33513345
// values in the stripe for each op, in format needed
33523346

3353-
releaseinfo rinfo[RINGROWS]; // info on all rows that have been released. Must be cleared after each release, i. e. 0 when starting new 8block
3347+
__m256d *releaseqktringbase[RINGROWS]; // base of Qkt region mapped to ring. Must be on RBLOCK boundary; low bits are corresponding ring row
3348+
UI8 releaserowmask[RINGROWS]; // mask of all the resultblocks that have been modified in this line Must be cleared after each release, i. e. 0 when starting new 8block
33543349

33553350
I opno=ti; // op# to create the index for. First one is out thread#
33563351
do{
33573352
if(opno<nops){A ma; // if the first reservation is too high, we have more threads than ops. skip it then
3358-
// calculate column index for the op - the offset into mask/cols when we process a given stripe
3353+
// calculate column index for the op - the offset into mask/cols as we process each stripe
33593354
I ncvals=AN(opcolvals[opno]); __m256i *cmask=(__m256i *)BAV(opcolmasks[opno]); // # non0s left in op column, pointer to mask
33603355
GATV0(ma,INT4,ncvals+1,1) // allocate space for index (incl 1 sentinel), on cacheline boundary
33613356
I4 *mav=I4AV1(ma); (*colndxs)[opno]=mav; // Get address of index, publish the address to other threads. Lots of false sharing on this store!
@@ -3369,17 +3364,17 @@ static unsigned char jtbatchopx(J jt,struct bopctx* const ctx,UI4 ti){
33693364

33703365
// calculate stripe index for the op
33713366
I bsumtodate=0; C *mask=CAV(oprowmasks[opno]); // total 1s found, running pointer to mask
3372-
DO(nstripes, I start=(*stripestartend1)[opno][0], end=(*stripestartend1)[opno][1]-BW, bsum=0; while(start<end){bsum+=*(I*)&mask[start]; start+=BW;} bsum+=*(I*)&mask[start]<<(start==end?0:BW/2);
3373-
bsum+=bsum>>32; bsum+=bsum>>16; bsum+=bsum>>8; (*opstripebsum)[i][opno]=bsumtodate+=(C)bsum;)
3367+
DO(nstripes, I start=(*stripestartend1)[opno][0], end=(*stripestartend1)[opno][1]-SZI, bsum=0; while(start<end){bsum+=*(I*)&mask[start]; start+=SZI;} bsum+=*(I*)&mask[start]<<(start==end?0:BW/2);
3368+
bsum+=bsum>>32; bsum+=bsum>>16; bsum+=bsum>>8; (*opstripebsum)[i][opno]=bsumtodate+=(C)bsum;) // add; if last word incomplete, it must have exactly 32 bits
33743369

33753370
}
33763371
opno=__atomic_fetch_add(&ctx->colndxct,1,__ATOMIC_ACQ_REL); // reserve next row. Every thread will finish with one failing reservation
33773372
}while(opno<nops);
33783373

33793374
// initialize internal areas while we wait for indexes to settle.
33803375
mvc(sizeof(ring),ring,MEMSET00LEN,MEMSET00); // clear the ring to all 0.0
3381-
mvc(sizeof(rinfo),rinfo,MEMSET00LEN,MEMSET00); // clear the release info to 0
3382-
I b8start=0; // start of ring area where next 8block is built AND end+1 pointer of data released to ring (could be US)
3376+
mvc(sizeof(releaserowmask),releaserowmask,MEMSET00LEN,MEMSET00); // clear the release info to 0
3377+
DO(RINGROWS, releaseqktringbase[i]=(__m256d*)i;)
33833378

33843379
DO(nops, opstat[i].acolvals=EAV(opcolvals[i]);) // get pointer to values in each column
33853380
__m256d sgnbit=_mm256_castsi256_pd(_mm256_set1_epi64x(Iimin)); // 0x8..0
@@ -3390,9 +3385,23 @@ static unsigned char jtbatchopx(J jt,struct bopctx* const ctx,UI4 ti){
33903385

33913386
I stripex=ti; // initial stripe reservation, from thread#
33923387
// state needed to release one row of ring (viz relstart).
3393-
UI releaseblockmask=0; I relstart=0, releasenormct=8, releasedelayct=0, releasect; __m256d *releaseqkbase; // mask of blocks in row, index of row being released, normal burst length, amount of processing before next burst, actual burst length, Qkt addr of burst
3388+
#define RELEASEBLOCKCT 8 // number of RBLOCKS to handle at a time. This should be as big as we can make it without filling write buffers. 2 blocks=1 cacheline
3389+
UI releaseblockmask=0; I releasect; __m256d *releaseqktringbasecurr; // mask of blocks in row, index of row being released, normal burst length, amount of processing before next burst, actual burst length, Qkt addr of burst
3390+
#define CYCBETWEENRELEASE0 500 // estimated clocks to receive DRAM data. We try to burst only this often so that write buffers can drain
3391+
#define CYCPERINSERT 12 // number of cycles per insertion
3392+
#define RELEASEDELAYCT0 RELEASEBLOCKCT*(CYCBETWEENRELEASE0/CYCPERINSERT)*sizeof(US) // unbiased delay, measured in #offsets processed
3393+
I releasedelayct0; // biased delay given ring status
3394+
// portmanteau register holding ring status
3395+
I ringdctrlb8=0; // delay count, relstart, b8start
3396+
#define RINGDCTX 48 // bit position of delayct, which counts USs and goes positive when it is OK to release RELEASEBLOCKCT blocks. Ends at sign bit
3397+
#define RINGDCTMASK 0xff000000000000
3398+
#define RINGRELSTARTX 8 // bit position of relstart, the next/current row of the ring to relesse
3399+
#define RINGRELSTARTMASK ((RINGROWS-1)<<RINGRELSTARTX)
3400+
#define RINGRELSTARTWRAP (~(RINGROWS<<RINGRELSTARTX)) // mask to wrap relstart when incremented
3401+
#define RINGB8STARTX 0 // start of ring area where next 8block is built AND end+1 pointer of data released to ring
3402+
#define RINGB8STARTMASK ((RINGROWS-1)<<RINGB8STARTX)
33943403
// when relstart==b8start, ring is empty. releaseblockmask is always 0 then. releasedelayct is set negative to delay the next batch; it increments as blocks are put into the ring. releasenormct gives the normal burst size, which increases
3395-
// if the ring fills. releasect counts the actual burst kength, which is releasenormct unless the ring is full or processing is over, in which case it is set to high-value to flush the ring
3404+
// if the ring fills. releasect counts the actual burst length, which is RELEASEBLOCKCT unless the ring is full or processing is over, in which case it is set to high-value to flush the ring
33963405
while(stripex<nstripes){ // ... for each reservation...
33973406
I stripe=stripegrade[stripex]; // get the actual stripe# to process
33983407

@@ -3416,21 +3425,21 @@ if(ssize>MAXNON0||ssize<=0)SEGFAULT;
34163425
__m256i m32=_mm256_loadu_si256(smask+j32); // read 32 bits. May overfetch
34173426
((C*)&opstat[io].rbmask)[j32]=(UI)(UI4)_mm256_movemask_ps(_mm256_castsi256_ps(_mm256_cmpgt_epi32(m32,_mm256_setzero_si256()))); // create touched mask for each 4-value section (i. e. 1 resultblock), save in rbmask
34183427
I bits=(UI)(UI4)_mm256_movemask_epi8(_mm256_cmpgt_epi8(m32,_mm256_setzero_si256())); // extract the 32 bits
3419-
while(bits){lastoffset=nextstripeofst[nofst]=(j32*32+CTTZI(bits))*sizeof(E); bits&=-bits; ++nofst; if(nofst==ssize)goto finmask;} // turn each 1-bit into a byte offset; stop if we have hit # offsets
3428+
while(bits){lastoffset=nextstripeofst[nofst]=(j32*32+CTTZI(bits))*sizeof(E); bits&=bits-1; ++nofst; if(nofst==ssize)goto finmask;} // turn each 1-bit into a byte offset; stop if we have hit # offsets
34203429
}
34213430
finmask:;
34223431
opstat[io].rbmask&=(UI)~0>>(BW-(((*stripestartend1)[stripe][1]-sstart)>>LGRESBLKE)); // mask off blocks past the valid region
34233432
// read the row values for this stripe and convert them to 0213 form
34243433
I j4; C *sval; // byte offset of 4-E reads to date; pointer to start of values for this section
34253434
opstat[io].arow0213=nextstripe0213; // remember where the offsets start
3426-
for(j4=0,sval=(C*)(EAV(oprowvals[io])+stripeval0x);j4<(ssize<<LGRESBLKE);j4+=RESBLKE*sizeof(E)){
3435+
for(j4=0,sval=(C*)(EAV(oprowvals[io])+stripeval0x);(UI)j4<(ssize*sizeof(E));j4+=RESBLKE*sizeof(E)){
34273436
__m256d h0l0h1l1=_mm256_loadu_pd((D*)(sval+j4)), h2l2h3l3=_mm256_loadu_pd((D*)(sval+j4+sizeof(E)*RESBLKE/2)); // read the next 4 values. This wipes out lots of D2$; perhaps should stream into temp area
34283437
__m256d h0h2h1h3=_mm256_shuffle_pd(h0l0h1l1,h2l2h3l3,0b0000), l0l2l1l3=_mm256_shuffle_pd(h0l0h1l1,h2l2h3l3,0b1111); // convert to 0213 order
34293438
_mm256_store_pd((D*)((C*)nextstripe0213+j4),h0h2h1h3); _mm256_store_pd((D*)((C*)nextstripe0213+j4+sizeof(E)*RESBLKE/2),l0l2l1l3); // store in 0213 order
34303439
}
34313440
// if last block of 4 values is not all valid, we must repeat the last valid offset to the end of the block, and put the last value into the last slot
34323441
lastoffset+=lastoffset<<(sizeof(US)*BB); lastoffset+=lastoffset<<(2*sizeof(US)*BB); *(I*)&nextstripeofst[nofst]=lastoffset; // append 4 copies of last offset; 0-3 are needed
3433-
I lastvalidlane=(0b01100011>>(ssize&(NPAR-1)))&(NPAR-1); // lane# holding last valid value: 0 1 2 3 -> 3 0 2 1 (since values are 0213)
3442+
I lastvalidlane=(0b01100011>>(2*(ssize&(NPAR-1))))&(NPAR-1); // lane# holding last valid value: 0 1 2 3 -> 3 0 2 1 (since values are 0213)
34343443
D (*lastvals)[2][4]=(D (*)[2][4])((C*)nextstripe0213+j4-RESBLKE*sizeof(E)); // address of resblk containing last valid pointer
34353444
(*lastvals)[0][3]=(*lastvals)[0][lastvalidlane]; (*lastvals)[1][3]=(*lastvals)[1][lastvalidlane]; // transfer value to last value in block, which is written last
34363445

@@ -3449,12 +3458,12 @@ finmask:;
34493458
I4 nextrowinop; // next row to process in this op
34503459
while((nextrowinop=currop->colndxahead)-b8qktrow<B8ROWS){ // if next row is in 8block
34513460
// next row can be processed in this 8block. Load the column info and update the readahead
3452-
I ringx=(b8start+(nextrowinop-b8qktrow))&(RINGROWS-1); // ring row to fill
3461+
I ringx=(I)releaseqktringbase[(ringdctrlb8+(nextrowinop-b8qktrow))&RINGB8STARTMASK]; // ring row to fill
34533462
E *r0=ring[ringx]; // base the offsets will be applied against
34543463
US *aof=currop->arowoffsets; __m256d *ava=currop->arow0213; I alen=currop->nofsts; // loop boundaries for processing the row of the stripe
34553464
__m256d colh=_mm256_set1_pd(currop->colvalahead.hi), coll=_mm256_set1_pd(currop->colvalahead.lo); // copy column value into all lanes
34563465
currop->colndxahead=currop->acolndxs[++currop->rowindex]; currop->colvalahead=currop->acolvals[currop->rowindex]; // read ahead for next row
3457-
rinfo[ringx].rowmask|=currop->rbmask; // make note of the resultblocks that will be modified by this row
3466+
releaserowmask[ringx]|=currop->rbmask; // make note of the resultblocks that will be modified by this row
34583467

34593468
// Calculate one row of the op
34603469
I andx=0; // counts in steps of RESBLKE*sizeof(one offset). With this stride we can use andx to point to offsets and andx*sizeof(E)/sizeof(one offset) (=8) to point to values
@@ -3489,11 +3498,13 @@ finmask:;
34893498
}while((andx+=(RESBLKE*sizeof(aof[0])))<alen); // end after last block
34903499

34913500
// send a few values to Qkt
3492-
releasedelayct+=andx; // add the number of blocks we processed. When the total goes nonnegative we can release again
3493-
if(releaseblockmask>(UI)REPSGN(releasedelayct)){ // if there are released values... (mask>0 and delayct nonneg)
3494-
releasect=releasenormct; // set releasect to the number of blocks to take. We will stop when the row is empty in any case
3501+
ringdctrlb8+=andx<<RINGDCTX; // add the number of blocks we processed. When the total goes nonnegative we can release again
3502+
if(releaseblockmask>(UI)REPSGN(ringdctrlb8)){ // if there are released values... (mask>0 and delayct nonneg)
3503+
releasect=RELEASEBLOCKCT; // set releasect to the number of blocks to take. We will stop when the row is empty in any case
3504+
ringdctrlb8=(ringdctrlb8&~RINGDCTMASK)+releasedelayct0; // reset delay till next block release
34953505
releaserow:; // entered from below to drain the ring, either on buffer-full or at end-of-operation. releasect is set to a high value in that case
3496-
__m256 *releaseringbase=(__m256*)ring[relstart]; // get address in ring
3506+
__m256d *releaseringbase=(__m256d*)ring[(I)releaseqktringbasecurr&(RINGROWS-1)]; // get base address in ring
3507+
__m256d *releaseqkbase=(__m256d*)((I)releaseqktringbasecurr&-RINGROWS); // get base address in Qkt
34973508
do{
34983509
// calculate a result block
34993510
I blockbyteofst=CTTZI(releaseblockmask)*sizeof(E)*RESBLKE; // get offset to next modified block in this row
@@ -3515,12 +3526,14 @@ releaserow:; // entered from below to drain the ring, either on buffer-full or
35153526
if((releaseblockmask&=(releaseblockmask-1))==0)goto rowfin; // advance to next block, exit if none
35163527
}while(--releasect); // could use PEXT & block mask to avoid need for releasect here
35173528
if(0){rowfin:; // come here when a row has been fully sent to Qkt
3518-
rinfo[relstart].rowmask=0; // when we finish a row, we must leave it with an empty mask
3519-
relstart=(relstart+1)&(RINGROWS-1); // advance to next row
3520-
if(relstart!=b8start){ // if the release area is not empty after removing the finished row...
3521-
releaseblockmask=rinfo[relstart].rowmask; releaseringbase=(__m256*)ring[relstart]; releaseqkbase=rinfo[relstart].qktbase; // move next row to the release variables. blockmask=0 means no work
3529+
I relstart=(ringdctrlb8&RINGRELSTARTMASK)>>RINGRELSTARTX; // extract ending release row#
3530+
releaserowmask[relstart]=0; // when we finish a row, we must leave it with an empty mask
3531+
releaseqktringbase[relstart]=(__m256d*)((I)releaseqktringbase[relstart]&(RINGROWS-1)); // clear Qkt, leaving ring row#
3532+
relstart=(relstart+1)&(RINGROWS-1); ringdctrlb8=(ringdctrlb8+(1<<RINGRELSTARTX))&RINGRELSTARTWRAP; // advance to next released row
3533+
if(((ringdctrlb8-relstart)&(RINGROWS-1))!=0){ // if the release area is not empty after removing the finished row...
3534+
releaseblockmask=releaserowmask[relstart]; releaseqktringbasecurr=releaseqktringbase[relstart]; // move next row to the release variables. blockmask=0 means no work
35223535
// Here we loop back to handle exception cases: (1) ring full; (2) operation finished. We set releasect to high-value
3523-
if(unlikely(((b8start-relstart)&(RINGROWS-1)))>=(RINGROWS-B8ROWS))goto releaserow; // if ring is still full, wait for it to drain
3536+
if(unlikely(((ringdctrlb8-relstart)&(RINGROWS-1))>=(RINGROWS-B8ROWS)))goto releaserow; // if ring is still full, wait for it to drain
35243537
if(unlikely(stripex>=nstripes))goto releaserow; // if the problem is over, flush the entire ring
35253538
}
35263539
if(unlikely(releasect>100))if(stripex>=nstripes)goto finis; else goto caughtup; // if we are coming out of a loopback, go to the right place: end, or just after we released into the full ring
@@ -3536,12 +3549,24 @@ releaserow:; // entered from below to drain the ring, either on buffer-full or
35363549

35373550
// 8block finished. close up the rows and release them to the output stage. For each nonempty row we write out the address of the Qkt data, and copy the
35383551
// mask. We also have to make sure the mask is cleared to 0 in all rows that go unreleased
3539-
I sx=b8start; I qktrcol=(I)&qktcol[b8qktrow*qktncols]; // store index, address of the modified row of Qkt corresponding to b8start
3540-
DONOUNROLL(B8ROWS, rinfo[sx].qktbase=(__m256d*)qktrcol; UI8 msk=rinfo[b8start].rowmask; rinfo[b8start].rowmask=0; rinfo[sx].rowmask=msk; sx=(sx+(msk>0))&(RINGROWS-1); b8start=(b8start+1)&(RINGROWS-1); qktrcol+=qktncols*sizeof(E);)
3541-
b8start=sx; // release the nonempty rows
3542-
if(releaseblockmask==0){releaseblockmask=rinfo[relstart].rowmask; releaseqkbase=rinfo[relstart].qktbase;} // if release queue was empty, move first row to the release variables. blockmask=0 means no work
3543-
else if(unlikely(((b8start-relstart)&(RINGROWS-1)))>=(RINGROWS-B8ROWS)){releasect=(UI4)~0>>1; goto releaserow;} // if ring is full, wait for it to drain
3552+
I origb8=ringdctrlb8&RINGB8STARTMASK, b8start=origb8, sx=b8start; I qktrcol=(I)&qktcol[b8qktrow*qktncols]; // store index, address of the modified row of Qkt corresponding to b8start
3553+
UI8 skiprows=0; // mask of empty rows that must be added to the end
3554+
DONOUNROLL(B8ROWS,
3555+
skiprows=(skiprows&-RINGROWS)+(I)releaseqktringbase[b8start]; // remember new row in case we skip it
3556+
releaseqktringbase[sx]=(__m256d*)(qktrcol+(I)releaseqktringbase[b8start]); UI8 msk=releaserowmask[b8start];
3557+
releaserowmask[sx]=msk; sx=(sx+(msk>0))&(RINGROWS-1); b8start=(b8start+1)&(RINGROWS-1); qktrcol+=qktncols*sizeof(E);
3558+
skiprows<<=((msk>0)^1)<<3; // if we skip the row, shift it up to safety (8 bits)
3559+
)
3560+
// rows [sx,b8start) were skipped: store the skipped row, clearing the mask and qky base. This returns the rows to the pool
3561+
while(b8start!=sx){skiprows>>=8; b8start=(b8start-1)&(RINGROWS-1); releaserowmask[b8start]=0; releaseqktringbase[b8start]=(__m256d*)(skiprows&(RINGROWS-1));}
3562+
// b8start has been advanced over all the nonskipped rows, which releases them.
3563+
ringdctrlb8=(ringdctrlb8&~RINGB8STARTMASK)+b8start; // install b8start in portmanteau
3564+
if(releaseblockmask==0){releaseblockmask=releaserowmask[origb8]; releaseqktringbasecurr=releaseqktringbase[origb8]; ringdctrlb8&=~RINGDCTMASK;} // if release queue was empty, move first row to the release variables. blockmask=0 means no work.
3565+
// delayct has been incrementing continuously, perhaps overflowing - clear it to start releasing
3566+
else if(unlikely(((ringdctrlb8-(ringdctrlb8>>RINGRELSTARTX))&(RINGROWS-1)))>=(RINGROWS-B8ROWS)){releasect=(UI4)~0>>1; goto releaserow;} // if ring is full, wait for it to drain
35443567
caughtup:; // here when we have removed the ring-full situation
3568+
// throttle the release depending on ring-full status. We figure this only when we add rows because it's not important to keep it exactly right
3569+
releasedelayct0=(-((((RINGROWS-((ringdctrlb8-(ringdctrlb8>>RINGRELSTARTX)-1)&(RINGROWS-1)))>>3)+1)*RELEASEDELAYCT0)>>3)<<RINGDCTX; // decrease delay (which is negative) with each eight-row section filled
35453570
} // end 'while aops'
35463571
stripex=__atomic_fetch_add(&ctx->resvx,1,__ATOMIC_ACQ_REL); // reserve next row. Every thread will finish with one failing reservation
35473572
}

0 commit comments

Comments
 (0)