Skip to content

Commit 6746904

Browse files
committed
add EOF marker functionality
since it's hard or impossible to close fifo pipes in shell scripts, it's handy to have a way to signal jobflow when the end of input is reached so it can terminate once everything is processed. the shell script can now run jobflow with -eof=XXX to specify a magic eof marker.
1 parent 1fff39d commit 6746904

File tree

1 file changed

+22
-2
lines changed

1 file changed

+22
-2
lines changed

jobflow.c

+22-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*/
1717

1818

19-
#define VERSION "1.2.2"
19+
#define VERSION "1.2.3"
2020

2121

2222
#undef _POSIX_C_SOURCE
@@ -86,6 +86,7 @@ typedef struct {
8686
unsigned numthreads;
8787
unsigned threads_running;
8888
char* statefile;
89+
char* eof_marker;
8990
unsigned long long skip;
9091
sblist* job_infos;
9192
sblist* subst_entries;
@@ -319,6 +320,7 @@ static int syntax(void) {
319320
"available options:\n\n"
320321
"-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
321322
"-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
323+
"-eof=XXX\n"
322324
"-exec ./mycommand {}\n"
323325
"\n"
324326
"-skip=XXX\n"
@@ -327,6 +329,10 @@ static int syntax(void) {
327329
" XXX=number of parallel processes to spawn\n"
328330
"-resume\n"
329331
" resume from last jobnumber stored in statefile\n"
332+
"-eof=XXX\n"
333+
" use XXX as the EOF marker on stdin\n"
334+
" if the marker is encountered, behave as if stdin was closed\n"
335+
" not compatible with pipe/bulk mode\n"
330336
"-statefile=XXX\n"
331337
" XXX=filename\n"
332338
" saves last launched jobnumber into a file\n"
@@ -393,6 +399,9 @@ static int parse_args(int argc, char** argv) {
393399
op_temp = op_get(op, SPL("statefile"));
394400
prog_state.statefile = op_temp;
395401

402+
op_temp = op_get(op, SPL("eof"));
403+
prog_state.eof_marker = op_temp;
404+
396405
op_temp = op_get(op, SPL("skip"));
397406
prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
398407
if(op_hasflag(op, SPL("resume"))) {
@@ -585,6 +594,13 @@ static size_t count_linefeeds(const char *buf, size_t len) {
585594
}
586595
return cnt;
587596
}
597+
598+
static int match_eof(char* inbuf, size_t len) {
599+
if(!prog_state.eof_marker) return 0;
600+
size_t l = strlen(prog_state.eof_marker);
601+
return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
602+
}
603+
588604
#define MAX_SUBSTS 16
589605
static int dispatch_line(char* inbuf, size_t len, char** argv) {
590606
char subst_buf[MAX_SUBSTS][4096];
@@ -751,13 +767,17 @@ int main(int argc, char** argv) {
751767

752768
if(!p) break;
753769
ptrdiff_t diff = (p - in) + 1;
770+
if(match_eof(in, diff)) {
771+
exitcode = 0;
772+
goto out;
773+
}
754774
if(!dispatch_line(in, diff, argv))
755775
goto out;
756776
left -= diff;
757777
in += diff;
758778
}
759779
if(!n) {
760-
if(left) dispatch_line(in, left, argv);
780+
if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
761781
break;
762782
}
763783
if(left > chunksize) {

0 commit comments

Comments
 (0)