diff --git a/sys/man/8/qer b/sys/man/8/qer index 7a2636537..675286bc4 100644 --- a/sys/man/8/qer +++ b/sys/man/8/qer @@ -15,7 +15,7 @@ qer, runq \- queue management for spooled files .br .B runq [ -.B -adsER +.B -dER ] [ .B -f @@ -26,10 +26,6 @@ qer, runq \- queue management for spooled files .I subdir ] [ -.B -l -.I load -] -[ .B -t .I time ] @@ -39,7 +35,7 @@ qer, runq \- queue management for spooled files ] [ .B -n -.I nprocs +.I njobs ] .I root cmd .SH DESCRIPTION @@ -84,10 +80,7 @@ starts with 'F', the second 'G', etc. .I Runq processes the files queued by .IR qer . -Without the -.B -a -option, -.I runq +.I Runq processes all requests in the directory .IR root / subdir , where @@ -96,9 +89,6 @@ is the argument to .B -q if present, else the contents of .BR /dev/user . -With the -.B -a -it processes all requests. Each request is processed by executing the command .I cmd with the contents of the control file as its arguments, @@ -172,31 +162,12 @@ be drained incrementally. It is most useful in combination with the .I -q flag. .P -The -.BR -s , -.BR -n , -and -.B -l -flags are only meaningful with the -.B -a -flag. They control amount of parallelism that -is used when sweeping all of the queues. The argument following the +The argument following the .B -n -flag specifies the number of queues that are swept -in parallel; the default is 50. The argument following the -.B -l -flag specifies the total number of queues that are being swept. -By default, there is no limit. The number of active sweeps -is cumulative over all active executions of -.IR runq . -The -.B -s -flag forces each queue directory to be processed by exactly -one instance of -.IR runq . -This is useful on systems that connect to slow -external systems and prevents all the queue sweeps from -piling up trying to process a few slow systems. +flag specifies the number of queued jobs that are processed +in parallel from the queue; the default is 1. +This is useful for a large queue to be processed with a bounded +amount of parallelism. .PP .I Runq is often called from diff --git a/sys/src/cmd/upas/q/runq.c b/sys/src/cmd/upas/q/runq.c index 69efe4570..16445ea36 100644 --- a/sys/src/cmd/upas/q/runq.c +++ b/sys/src/cmd/upas/q/runq.c @@ -1,9 +1,25 @@ #include "common.h" #include +typedef struct Job Job; + +struct Job { + Job *next; + int pid; + int ac; + int dfd; + char **av; + char *buf; /* backing for av */ + Dir *dp; /* not owned */ + Mlock *l; + Biobuf *b; +}; + void doalldirs(void); void dodir(char*); -void dofile(Dir*); +Job* dofile(Dir*); +Job* donefile(Job*, Waitmsg*); +void freejob(Job*); void rundir(char*); char* file(char*, char); void warning(char*, void*); @@ -17,7 +33,6 @@ char *cmd; char *root; int debug; int giveup = 2*24*60*60; -int load; int limit; /* the current directory */ @@ -28,67 +43,48 @@ char *curdir; char *runqlog = "runq"; -int *pidlist; char **badsys; /* array of recalcitrant systems */ int nbad; -int npid = 50; -int sflag; /* single thread per directory */ -int aflag; /* all directories */ +int njob = 1; /* number of concurrent jobs to invoke */ int Eflag; /* ignore E.xxxxxx dates */ int Rflag; /* no giving up, ever */ void usage(void) { - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); - exits(""); + fprint(2, "usage: runq [-dE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); + exits("usage"); } void main(int argc, char **argv) { - char *qdir, *x; + char *qdir; qdir = 0; ARGBEGIN{ - case 'l': - x = ARGF(); - if(x == 0) - usage(); - load = atoi(x); - if(load < 0) - load = 0; - break; case 'E': Eflag++; break; case 'R': /* no giving up -- just leave stuff in the queue */ Rflag++; break; - case 'a': - aflag++; - break; case 'd': debug++; break; case 'r': - limit = atoi(ARGF()); - break; - case 's': - sflag++; + limit = atoi(EARGF(usage())); break; case 't': - giveup = 60*60*atoi(ARGF()); + giveup = 60*60*atoi(EARGF(usage())); break; case 'q': - qdir = ARGF(); - if(qdir == 0) - usage(); + qdir = EARGF(usage()); break; case 'n': - npid = atoi(ARGF()); - if(npid == 0) + njob = atoi(EARGF(usage())); + if(njob == 0) usage(); break; }ARGEND; @@ -96,27 +92,17 @@ main(int argc, char **argv) if(argc != 2) usage(); - pidlist = malloc(npid*sizeof(*pidlist)); - if(pidlist == 0) - error("can't malloc", 0); - - if(aflag == 0 && qdir == 0) { + if(qdir == nil) qdir = getuser(); - if(qdir == 0) - error("unknown user", 0); - } + if(qdir == nil) + error("unknown user", 0); root = argv[0]; cmd = argv[1]; if(chdir(root) < 0) error("can't cd to %s", root); - doload(1); - if(aflag) - doalldirs(); - else - dodir(qdir); - doload(0); + dodir(qdir); exits(0); } @@ -142,74 +128,6 @@ emptydir(char *name) return 0; } -int -forkltd(void) -{ - int i; - int pid; - - for(i = 0; i < npid; i++){ - if(pidlist[i] <= 0) - break; - } - - while(i >= npid){ - pid = waitpid(); - if(pid < 0){ - syslog(0, runqlog, "forkltd confused"); - exits(0); - } - - for(i = 0; i < npid; i++) - if(pidlist[i] == pid) - break; - } - pidlist[i] = fork(); - return pidlist[i]; -} - -/* - * run all user directories, must be bootes (or root on unix) to do this - */ -void -doalldirs(void) -{ - Dir *db; - int fd; - long i, n; - - - fd = open(".", OREAD); - if(fd == -1){ - warning("reading %s", root); - return; - } - n = dirreadall(fd, &db); - if(n > 0){ - for(i=0; i 0){ - for(i=0; i 0 || fidx< nfiles){ + while(fidx< nfiles && nlive < njob){ + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ + fidx++; continue; - dofile(&dirbuf[i]); + } + if((j = dofile(&dirbuf[fidx])) != nil){ + nlive++; + j->next = hd; + hd = j; + } + fidx++; } - free(dirbuf); + if(nlive == 0){ + fprint(2, "nothing live\n"); + break; + } +rescan: + if((w = wait()) == nil){ + syslog(0, "runq", "wait error: %r"); + break; + } + found = 0; + for(p = &hd; *p != nil; p = &(*p)->next){ + if(w->pid == (*p)->pid){ + *p = donefile(*p, w); + found++; + nlive--; + break; + } + } + free(w); + if(!found) + goto rescan; } - if(aflag && sflag) - sysunlockfile(fd); - else - close(fd); + assert(hd == nil); + free(dirbuf); + close(fd); } /* @@ -314,17 +259,16 @@ keeplockalive(char *path, int fd) } /* - * try a message + * Launch trying a message, returning a job + * tracks the running pid. */ -void +Job* dofile(Dir *dp) { + int dtime, efd, i, etime; + Job *j; Dir *d; - int dfd, ac, dtime, efd, pid, i, etime; - char *buf, *cp, **av; - Waitmsg *wm; - Biobuf *b; - Mlock *l = nil; + char *cp; if(debug) fprint(2, "dofile %s\n", dp->name); @@ -337,14 +281,14 @@ dofile(Dir *dp) if(d == nil){ syslog(0, runqlog, "no data file for %s", dp->name); remmatch(dp->name); - return; + return nil; } if(dp->length == 0){ if(time(0)-dp->mtime > 15*60){ syslog(0, runqlog, "empty ctl file for %s", dp->name); remmatch(dp->name); } - return; + return nil; } dtime = d->mtime; free(d); @@ -358,31 +302,35 @@ dofile(Dir *dp) if(etime - dtime < 60*60){ /* up to the first hour, try every 15 minutes */ if(time(0) - etime < 15*60) - return; + return nil; } else { /* after the first hour, try once an hour */ if(time(0) - etime < 60*60) - return; + return nil; } - } /* * open control and data */ - b = sysopen(file(dp->name, 'C'), "rl", 0660); - if(b == 0) { + j = malloc(sizeof(Job)); + if(j == nil) + return nil; + memset(j, 0, sizeof(Job)); + j->dp = dp; + j->dfd = -1; + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); + if(j->b == 0) { if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); - return; + return nil; } - dfd = open(file(dp->name, 'D'), OREAD); - if(dfd < 0){ + j->dfd = open(file(dp->name, 'D'), OREAD); + if(j->dfd < 0){ if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } /* @@ -390,48 +338,36 @@ dofile(Dir *dp) * - read args into (malloc'd) buffer * - malloc a vector and copy pointers to args into it */ - buf = malloc(dp->length+1); - if(buf == 0){ + + j->buf = malloc(dp->length+1); + if(j->buf == nil){ warning("buffer allocation", 0); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - return; + freejob(j); + return nil; } - if(Bread(b, buf, dp->length) != dp->length){ + if(Bread(j->b, j->buf, dp->length) != dp->length){ warning("reading control file %s\n", dp->name); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - free(buf); - return; + freejob(j); + return nil; } - buf[dp->length] = 0; - av = malloc(2*sizeof(char*)); - if(av == 0){ + j->buf[dp->length] = 0; + j->av = malloc(2*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } - for(ac = 1, cp = buf; *cp; ac++){ + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ while(isspace(*cp)) *cp++ = 0; if(*cp == 0) break; - av = realloc(av, (ac+2)*sizeof(char*)); - if(av == 0){ + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; } - av[ac] = cp; + j->av[j->ac] = cp; while(*cp && !isspace(*cp)){ if(*cp++ == '"'){ while(*cp && *cp != '"') @@ -441,18 +377,18 @@ dofile(Dir *dp) } } } - av[0] = cmd; - av[ac] = 0; + j->av[0] = cmd; + j->av[j->ac] = 0; if(!Eflag &&time(0) - dtime > giveup){ - if(returnmail(av, dp->name, "Giveup") != 0) - logit("returnmail failed", dp->name, av); + if(returnmail(j->av, dp->name, "Giveup") != 0) + logit("returnmail failed", dp->name, j->av); remmatch(dp->name); goto done; } for(i = 0; i < nbad; i++){ - if(strcmp(av[3], badsys[i]) == 0) + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) goto done; } @@ -460,33 +396,34 @@ dofile(Dir *dp) * Ken's fs, for example, gives us 5 minutes of inactivity before * the lock goes stale, so we have to keep reading it. */ - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); /* * transfer */ - pid = fork(); - switch(pid){ + j->pid = fork(); + switch(j->pid){ case -1: - sysunlock(l); - sysunlockfile(Bfildes(b)); + sysunlock(j->l); + sysunlockfile(Bfildes(j->b)); syslog(0, runqlog, "out of procs"); exits(0); case 0: if(debug) { - fprint(2, "Starting %s", cmd); - for(ac = 0; av[ac]; ac++) - fprint(2, " %s", av[ac]); + fprint(2, "Starting %s\n", cmd); + for(i = 0; j->av[i]; i++) + fprint(2, " %s", j->av[i]); fprint(2, "\n"); } - logit("execing", dp->name, av); + logit("execing", dp->name, j->av); close(0); - dup(dfd, 0); - close(dfd); + dup(j->dfd, 0); + close(j->dfd); close(2); efd = open(file(dp->name, 'E'), OWRITE); if(efd < 0){ - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); + if(debug) + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); efd = create(file(dp->name, 'E'), OWRITE, 0666); if(efd < 0){ if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); @@ -494,50 +431,72 @@ dofile(Dir *dp) } } seek(efd, 0, 2); - exec(cmd, av); + exec(cmd, j->av); error("can't exec %s", cmd); break; default: - for(;;){ - wm = wait(); - if(wm == nil) - error("wait failed: %r", ""); - if(wm->pid == pid) - break; - free(wm); - } - if(debug) - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); - - if(wm->msg[0]){ - if(debug) - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); - if(!Rflag && strstr(wm->msg, "Retry")==0){ - /* return the message and remove it */ - if(returnmail(av, dp->name, wm->msg) != 0) - logit("returnmail failed", dp->name, av); - remmatch(dp->name); - } else { - /* add sys to bad list and try again later */ - nbad++; - badsys = realloc(badsys, nbad*sizeof(char*)); - badsys[nbad-1] = strdup(av[3]); - } - } else { - /* it worked remove the message */ - remmatch(dp->name); - } - free(wm); - + return j; } done: - if (l) - sysunlock(l); - Bterm(b); - sysunlockfile(Bfildes(b)); - free(buf); - free(av); - close(dfd); + freejob(j); + return nil; +} + +/* + * Handle the completion of a job. + * Wait for the pid, check its status, + * and then pop the job off the list. + * Return the next running job. + */ +Job* +donefile(Job *j, Waitmsg *wm) +{ + Job *n; + + if(debug) + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + + if(wm->msg[0]){ + if(debug) + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); + if(!Rflag && strstr(wm->msg, "Retry")==0){ + /* return the message and remove it */ + if(returnmail(j->av, j->dp->name, wm->msg) != 0) + logit("returnmail failed", j->dp->name, j->av); + remmatch(j->dp->name); + } else { + /* add sys to bad list and try again later */ + nbad++; + badsys = realloc(badsys, nbad*sizeof(char*)); + badsys[nbad-1] = strdup(j->av[3]); + } + } else { + /* it worked remove the message */ + remmatch(j->dp->name); + } + n = j->next; + freejob(j); + return n; +} + +/* + * Release resources associated with + * a job. + */ +void +freejob(Job *j) +{ + if(j->b != nil){ + sysunlockfile(Bfildes(j->b)); + Bterm(j->b); + } + if(j->dfd != -1) + close(j->dfd); + if(j->l != nil) + sysunlock(j->l); + free(j->buf); + free(j->av); + free(j); } @@ -691,71 +650,3 @@ logit(char *msg, char *file, char **av) } syslog(0, runqlog, "%s", buf); } - -char *loadfile = ".runqload"; - -/* - * load balancing - */ -void -doload(int start) -{ - int fd; - char buf[32]; - int i, n; - Mlock *l; - Dir *d; - - if(load <= 0) - return; - - if(chdir(root) < 0){ - load = 0; - return; - } - - l = syslock(loadfile); - fd = open(loadfile, ORDWR); - if(fd < 0){ - fd = create(loadfile, 0666, ORDWR); - if(fd < 0){ - load = 0; - sysunlock(l); - return; - } - } - - /* get current load */ - i = 0; - n = read(fd, buf, sizeof(buf)-1); - if(n >= 0){ - buf[n] = 0; - i = atoi(buf); - } - if(i < 0) - i = 0; - - /* ignore load if file hasn't been changed in 30 minutes */ - d = dirfstat(fd); - if(d != nil){ - if(d->mtime + 30*60 < time(0)) - i = 0; - free(d); - } - - /* if load already too high, give up */ - if(start && i >= load){ - sysunlock(l); - exits(0); - } - - /* increment/decrement load */ - if(start) - i++; - else - i--; - seek(fd, 0, 0); - fprint(fd, "%d\n", i); - sysunlock(l); - close(fd); -}