upas/runq: support parallel queue processing, drop -a

When running a mail queue, it's useful to run it with limited
parallelism. This helps mailing lists process messages in a
reasonable time.

At the same time, we can remove the load balancing from runq,
since the kinds of systems that this matters on no longer
exist, and running multiple queues at once can be better
done through xargs.
This commit is contained in:
Ori Bernstein 2021-01-23 11:03:05 -08:00
parent 5e20e8f963
commit f321298c55
2 changed files with 199 additions and 337 deletions

View file

@ -15,7 +15,7 @@ qer, runq \- queue management for spooled files
.br
.B runq
[
.B -adsER
.B -dER
]
[
.B -f
@ -26,10 +26,6 @@ qer, runq \- queue management for spooled files
.I subdir
]
[
.B -l
.I load
]
[
.B -t
.I time
]
@ -39,7 +35,7 @@ qer, runq \- queue management for spooled files
]
[
.B -n
.I nprocs
.I njobs
]
.I root cmd
.SH DESCRIPTION
@ -84,10 +80,7 @@ starts with 'F', the second 'G', etc.
.I Runq
processes the files queued by
.IR qer .
Without the
.B -a
option,
.I runq
.I Runq
processes all requests in the directory
.IR root / subdir ,
where
@ -96,9 +89,6 @@ is the argument to
.B -q
if present, else the contents of
.BR /dev/user .
With the
.B -a
it processes all requests.
Each request is processed by executing the command
.I cmd
with the contents of the control file as its arguments,
@ -172,31 +162,12 @@ be drained incrementally. It is most useful in combination with the
.I -q
flag.
.P
The
.BR -s ,
.BR -n ,
and
.B -l
flags are only meaningful with the
.B -a
flag. They control amount of parallelism that
is used when sweeping all of the queues. The argument following the
The argument following the
.B -n
flag specifies the number of queues that are swept
in parallel; the default is 50. The argument following the
.B -l
flag specifies the total number of queues that are being swept.
By default, there is no limit. The number of active sweeps
is cumulative over all active executions of
.IR runq .
The
.B -s
flag forces each queue directory to be processed by exactly
one instance of
.IR runq .
This is useful on systems that connect to slow
external systems and prevents all the queue sweeps from
piling up trying to process a few slow systems.
flag specifies the number of queued jobs that are processed
in parallel from the queue; the default is 1.
This is useful for a large queue to be processed with a bounded
amount of parallelism.
.PP
.I Runq
is often called from

View file

@ -1,9 +1,25 @@
#include "common.h"
#include <ctype.h>
typedef struct Job Job;
struct Job {
Job *next;
int pid;
int ac;
int dfd;
char **av;
char *buf; /* backing for av */
Dir *dp; /* not owned */
Mlock *l;
Biobuf *b;
};
void doalldirs(void);
void dodir(char*);
void dofile(Dir*);
Job* dofile(Dir*);
Job* donefile(Job*, Waitmsg*);
void freejob(Job*);
void rundir(char*);
char* file(char*, char);
void warning(char*, void*);
@ -17,7 +33,6 @@ char *cmd;
char *root;
int debug;
int giveup = 2*24*60*60;
int load;
int limit;
/* the current directory */
@ -28,67 +43,48 @@ char *curdir;
char *runqlog = "runq";
int *pidlist;
char **badsys; /* array of recalcitrant systems */
int nbad;
int npid = 50;
int sflag; /* single thread per directory */
int aflag; /* all directories */
int njob = 1; /* number of concurrent jobs to invoke */
int Eflag; /* ignore E.xxxxxx dates */
int Rflag; /* no giving up, ever */
void
usage(void)
{
fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
exits("");
fprint(2, "usage: runq [-dE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
exits("usage");
}
void
main(int argc, char **argv)
{
char *qdir, *x;
char *qdir;
qdir = 0;
ARGBEGIN{
case 'l':
x = ARGF();
if(x == 0)
usage();
load = atoi(x);
if(load < 0)
load = 0;
break;
case 'E':
Eflag++;
break;
case 'R': /* no giving up -- just leave stuff in the queue */
Rflag++;
break;
case 'a':
aflag++;
break;
case 'd':
debug++;
break;
case 'r':
limit = atoi(ARGF());
break;
case 's':
sflag++;
limit = atoi(EARGF(usage()));
break;
case 't':
giveup = 60*60*atoi(ARGF());
giveup = 60*60*atoi(EARGF(usage()));
break;
case 'q':
qdir = ARGF();
if(qdir == 0)
usage();
qdir = EARGF(usage());
break;
case 'n':
npid = atoi(ARGF());
if(npid == 0)
njob = atoi(EARGF(usage()));
if(njob == 0)
usage();
break;
}ARGEND;
@ -96,27 +92,17 @@ main(int argc, char **argv)
if(argc != 2)
usage();
pidlist = malloc(npid*sizeof(*pidlist));
if(pidlist == 0)
error("can't malloc", 0);
if(aflag == 0 && qdir == 0) {
if(qdir == nil)
qdir = getuser();
if(qdir == 0)
if(qdir == nil)
error("unknown user", 0);
}
root = argv[0];
cmd = argv[1];
if(chdir(root) < 0)
error("can't cd to %s", root);
doload(1);
if(aflag)
doalldirs();
else
dodir(qdir);
doload(0);
exits(0);
}
@ -142,74 +128,6 @@ emptydir(char *name)
return 0;
}
int
forkltd(void)
{
int i;
int pid;
for(i = 0; i < npid; i++){
if(pidlist[i] <= 0)
break;
}
while(i >= npid){
pid = waitpid();
if(pid < 0){
syslog(0, runqlog, "forkltd confused");
exits(0);
}
for(i = 0; i < npid; i++)
if(pidlist[i] == pid)
break;
}
pidlist[i] = fork();
return pidlist[i];
}
/*
* run all user directories, must be bootes (or root on unix) to do this
*/
void
doalldirs(void)
{
Dir *db;
int fd;
long i, n;
fd = open(".", OREAD);
if(fd == -1){
warning("reading %s", root);
return;
}
n = dirreadall(fd, &db);
if(n > 0){
for(i=0; i<n; i++){
if(db[i].qid.type & QTDIR){
if(emptydir(db[i].name))
continue;
switch(forkltd()){
case -1:
syslog(0, runqlog, "out of procs");
doload(0);
exits(0);
case 0:
if(sysdetach() < 0)
error("%r", 0);
dodir(db[i].name);
exits(0);
default:
break;
}
}
}
free(db);
}
close(fd);
}
/*
* cd to a user directory and run it
*/
@ -234,29 +152,56 @@ dodir(char *name)
void
rundir(char *name)
{
int fd;
long i;
Job *hd, *j, **p;
int nlive, fidx, fd, found;
Waitmsg *w;
if(aflag && sflag)
fd = sysopenlocked(".", OREAD);
else
fd = open(".", OREAD);
if(fd == -1){
warning("reading %s", name);
return;
}
fidx= 0;
hd = nil;
nlive = 0;
nfiles = dirreadall(fd, &dirbuf);
if(nfiles > 0){
for(i=0; i<nfiles; i++){
if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
while(nlive > 0 || fidx< nfiles){
while(fidx< nfiles && nlive < njob){
if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
fidx++;
continue;
dofile(&dirbuf[i]);
}
if((j = dofile(&dirbuf[fidx])) != nil){
nlive++;
j->next = hd;
hd = j;
}
fidx++;
}
if(nlive == 0){
fprint(2, "nothing live\n");
break;
}
rescan:
if((w = wait()) == nil){
syslog(0, "runq", "wait error: %r");
break;
}
found = 0;
for(p = &hd; *p != nil; p = &(*p)->next){
if(w->pid == (*p)->pid){
*p = donefile(*p, w);
found++;
nlive--;
break;
}
}
free(w);
if(!found)
goto rescan;
}
assert(hd == nil);
free(dirbuf);
}
if(aflag && sflag)
sysunlockfile(fd);
else
close(fd);
}
@ -314,17 +259,16 @@ keeplockalive(char *path, int fd)
}
/*
* try a message
* Launch trying a message, returning a job
* tracks the running pid.
*/
void
Job*
dofile(Dir *dp)
{
int dtime, efd, i, etime;
Job *j;
Dir *d;
int dfd, ac, dtime, efd, pid, i, etime;
char *buf, *cp, **av;
Waitmsg *wm;
Biobuf *b;
Mlock *l = nil;
char *cp;
if(debug)
fprint(2, "dofile %s\n", dp->name);
@ -337,14 +281,14 @@ dofile(Dir *dp)
if(d == nil){
syslog(0, runqlog, "no data file for %s", dp->name);
remmatch(dp->name);
return;
return nil;
}
if(dp->length == 0){
if(time(0)-dp->mtime > 15*60){
syslog(0, runqlog, "empty ctl file for %s", dp->name);
remmatch(dp->name);
}
return;
return nil;
}
dtime = d->mtime;
free(d);
@ -358,31 +302,35 @@ dofile(Dir *dp)
if(etime - dtime < 60*60){
/* up to the first hour, try every 15 minutes */
if(time(0) - etime < 15*60)
return;
return nil;
} else {
/* after the first hour, try once an hour */
if(time(0) - etime < 60*60)
return;
return nil;
}
}
/*
* open control and data
*/
b = sysopen(file(dp->name, 'C'), "rl", 0660);
if(b == 0) {
j = malloc(sizeof(Job));
if(j == nil)
return nil;
memset(j, 0, sizeof(Job));
j->dp = dp;
j->dfd = -1;
j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
if(j->b == 0) {
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
return;
return nil;
}
dfd = open(file(dp->name, 'D'), OREAD);
if(dfd < 0){
j->dfd = open(file(dp->name, 'D'), OREAD);
if(j->dfd < 0){
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
Bterm(b);
sysunlockfile(Bfildes(b));
return;
freejob(j);
return nil;
}
/*
@ -390,48 +338,36 @@ dofile(Dir *dp)
* - read args into (malloc'd) buffer
* - malloc a vector and copy pointers to args into it
*/
buf = malloc(dp->length+1);
if(buf == 0){
j->buf = malloc(dp->length+1);
if(j->buf == nil){
warning("buffer allocation", 0);
Bterm(b);
sysunlockfile(Bfildes(b));
close(dfd);
return;
freejob(j);
return nil;
}
if(Bread(b, buf, dp->length) != dp->length){
if(Bread(j->b, j->buf, dp->length) != dp->length){
warning("reading control file %s\n", dp->name);
Bterm(b);
sysunlockfile(Bfildes(b));
close(dfd);
free(buf);
return;
freejob(j);
return nil;
}
buf[dp->length] = 0;
av = malloc(2*sizeof(char*));
if(av == 0){
j->buf[dp->length] = 0;
j->av = malloc(2*sizeof(char*));
if(j->av == 0){
warning("argv allocation", 0);
close(dfd);
free(buf);
Bterm(b);
sysunlockfile(Bfildes(b));
return;
freejob(j);
return nil;
}
for(ac = 1, cp = buf; *cp; ac++){
for(j->ac = 1, cp = j->buf; *cp; j->ac++){
while(isspace(*cp))
*cp++ = 0;
if(*cp == 0)
break;
av = realloc(av, (ac+2)*sizeof(char*));
if(av == 0){
j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
if(j->av == 0){
warning("argv allocation", 0);
close(dfd);
free(buf);
Bterm(b);
sysunlockfile(Bfildes(b));
return;
}
av[ac] = cp;
j->av[j->ac] = cp;
while(*cp && !isspace(*cp)){
if(*cp++ == '"'){
while(*cp && *cp != '"')
@ -441,18 +377,18 @@ dofile(Dir *dp)
}
}
}
av[0] = cmd;
av[ac] = 0;
j->av[0] = cmd;
j->av[j->ac] = 0;
if(!Eflag &&time(0) - dtime > giveup){
if(returnmail(av, dp->name, "Giveup") != 0)
logit("returnmail failed", dp->name, av);
if(returnmail(j->av, dp->name, "Giveup") != 0)
logit("returnmail failed", dp->name, j->av);
remmatch(dp->name);
goto done;
}
for(i = 0; i < nbad; i++){
if(strcmp(av[3], badsys[i]) == 0)
if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
goto done;
}
@ -460,33 +396,34 @@ dofile(Dir *dp)
* Ken's fs, for example, gives us 5 minutes of inactivity before
* the lock goes stale, so we have to keep reading it.
*/
l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
/*
* transfer
*/
pid = fork();
switch(pid){
j->pid = fork();
switch(j->pid){
case -1:
sysunlock(l);
sysunlockfile(Bfildes(b));
sysunlock(j->l);
sysunlockfile(Bfildes(j->b));
syslog(0, runqlog, "out of procs");
exits(0);
case 0:
if(debug) {
fprint(2, "Starting %s", cmd);
for(ac = 0; av[ac]; ac++)
fprint(2, " %s", av[ac]);
fprint(2, "Starting %s\n", cmd);
for(i = 0; j->av[i]; i++)
fprint(2, " %s", j->av[i]);
fprint(2, "\n");
}
logit("execing", dp->name, av);
logit("execing", dp->name, j->av);
close(0);
dup(dfd, 0);
close(dfd);
dup(j->dfd, 0);
close(j->dfd);
close(2);
efd = open(file(dp->name, 'E'), OWRITE);
if(efd < 0){
if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
if(debug)
syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
efd = create(file(dp->name, 'E'), OWRITE, 0666);
if(efd < 0){
if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@ -494,18 +431,28 @@ dofile(Dir *dp)
}
}
seek(efd, 0, 2);
exec(cmd, av);
exec(cmd, j->av);
error("can't exec %s", cmd);
break;
default:
for(;;){
wm = wait();
if(wm == nil)
error("wait failed: %r", "");
if(wm->pid == pid)
break;
free(wm);
return j;
}
done:
freejob(j);
return nil;
}
/*
* Handle the completion of a job.
* Wait for the pid, check its status,
* and then pop the job off the list.
* Return the next running job.
*/
Job*
donefile(Job *j, Waitmsg *wm)
{
Job *n;
if(debug)
fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
@ -514,30 +461,42 @@ dofile(Dir *dp)
fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
if(!Rflag && strstr(wm->msg, "Retry")==0){
/* return the message and remove it */
if(returnmail(av, dp->name, wm->msg) != 0)
logit("returnmail failed", dp->name, av);
remmatch(dp->name);
if(returnmail(j->av, j->dp->name, wm->msg) != 0)
logit("returnmail failed", j->dp->name, j->av);
remmatch(j->dp->name);
} else {
/* add sys to bad list and try again later */
nbad++;
badsys = realloc(badsys, nbad*sizeof(char*));
badsys[nbad-1] = strdup(av[3]);
badsys[nbad-1] = strdup(j->av[3]);
}
} else {
/* it worked remove the message */
remmatch(dp->name);
remmatch(j->dp->name);
}
n = j->next;
freejob(j);
return n;
}
free(wm);
/*
* Release resources associated with
* a job.
*/
void
freejob(Job *j)
{
if(j->b != nil){
sysunlockfile(Bfildes(j->b));
Bterm(j->b);
}
done:
if (l)
sysunlock(l);
Bterm(b);
sysunlockfile(Bfildes(b));
free(buf);
free(av);
close(dfd);
if(j->dfd != -1)
close(j->dfd);
if(j->l != nil)
sysunlock(j->l);
free(j->buf);
free(j->av);
free(j);
}
@ -691,71 +650,3 @@ logit(char *msg, char *file, char **av)
}
syslog(0, runqlog, "%s", buf);
}
char *loadfile = ".runqload";
/*
* load balancing
*/
void
doload(int start)
{
int fd;
char buf[32];
int i, n;
Mlock *l;
Dir *d;
if(load <= 0)
return;
if(chdir(root) < 0){
load = 0;
return;
}
l = syslock(loadfile);
fd = open(loadfile, ORDWR);
if(fd < 0){
fd = create(loadfile, 0666, ORDWR);
if(fd < 0){
load = 0;
sysunlock(l);
return;
}
}
/* get current load */
i = 0;
n = read(fd, buf, sizeof(buf)-1);
if(n >= 0){
buf[n] = 0;
i = atoi(buf);
}
if(i < 0)
i = 0;
/* ignore load if file hasn't been changed in 30 minutes */
d = dirfstat(fd);
if(d != nil){
if(d->mtime + 30*60 < time(0))
i = 0;
free(d);
}
/* if load already too high, give up */
if(start && i >= load){
sysunlock(l);
exits(0);
}
/* increment/decrement load */
if(start)
i++;
else
i--;
seek(fd, 0, 0);
fprint(fd, "%d\n", i);
sysunlock(l);
close(fd);
}