upas/runq: bring back -a

Turns out -a is useful in crontab, so bring
back a simplified version of it. This only
iterates through directories one at a time.
This commit is contained in:
Ori Bernstein 2021-01-23 16:05:21 -08:00
parent 41f85d46f8
commit 51319cc5b5
2 changed files with 124 additions and 75 deletions

View file

@ -15,7 +15,7 @@ qer, runq \- queue management for spooled files
.br .br
.B runq .B runq
[ [
.B -dER .B -adER
] ]
[ [
.B -f .B -f
@ -50,13 +50,7 @@ separated by spaces.
The data file contains the standard input to The data file contains the standard input to
.IR qer . .IR qer .
The files are created in the directory The files are created in the directory
.IR root / subdir , .IR root / subdi
where
.I subdir
is the argument to
.B -q
if present, else the contents of
.BR /dev/user .
The names of the control and data files differ only The names of the control and data files differ only
in the first character which is `C' and `D' respectively. in the first character which is `C' and `D' respectively.
.IR Mktemp (2) .IR Mktemp (2)
@ -77,6 +71,18 @@ of the copies differ from the name of the data file
only in the first character. The first one only in the first character. The first one
starts with 'F', the second 'G', etc. starts with 'F', the second 'G', etc.
.P .P
Qer takes the following arguments:
.TP
.B -q subdir
Specifies the queue subdirectory to use. If
unspecified, the contents of
.B /dev/user
are used.
.TP
.B -f file
Specifies the files to copy into the queue
directory, in the manner described above.
.P
.I Runq .I Runq
processes the files queued by processes the files queued by
.IR qer . .IR qer .
@ -124,32 +130,34 @@ a data file younger than one hour will not be processed if its
error file exists and was last modified within the preceding 10 minutes. error file exists and was last modified within the preceding 10 minutes.
A data file older than one hour will not be processed if its error A data file older than one hour will not be processed if its error
file exists and was last modified within the preceding hour. file exists and was last modified within the preceding hour.
The .PP
The following flags are accepted by runq:
.TP
.B -a
Causes runq to process all user directories in sequence, instead
of only the directory of the current user.
.TP
.B -E .B -E
flag causes all files to be reprocessed regardless of Causes all files to be reprocessed regardless of
the file times. the file times.
.P .TP
The
.B -R .B -R
flag instructs Instructs
.I runq .I runq
never to give up on a failed queue job, instead leaving never to give up on a failed queue job, instead leaving
it in the queue to be retried. it in the queue to be retried.
.P .TP
The
.B -d .B -d
option causes debugging output on standard error Causes debugging output on standard error
describing the progress through the queues. describing the progress through the queues.
.P .TP
The
.B -t .B -t
flags specifies the number of hours Specifies the number of hours
that retries will continue after a send that retries will continue after a send
failure. The default is 48 hours. failure. The default is 48 hours.
.P .TP
The
.BR -r .BR -r
flag limits the number of files that are processed in a single pass of a queue. Limits the number of files that are processed in a single pass of a queue.
.I Runq .I Runq
accumulates the entire directory containing a queue before processing any accumulates the entire directory containing a queue before processing any
files. When a queue contains many files and the system does not files. When a queue contains many files and the system does not
@ -161,10 +169,9 @@ to process the directory in chunks, allowing the queue to
be drained incrementally. It is most useful in combination with the be drained incrementally. It is most useful in combination with the
.I -q .I -q
flag. flag.
.P .TP
The argument following the
.B -n .B -n
flag specifies the number of queued jobs that are processed Specifies the number of queued jobs that are processed
in parallel from the queue; the default is 1. in parallel from the queue; the default is 1.
This is useful for a large queue to be processed with a bounded This is useful for a large queue to be processed with a bounded
amount of parallelism. amount of parallelism.

View file

@ -2,6 +2,13 @@
#include <ctype.h> #include <ctype.h>
typedef struct Job Job; typedef struct Job Job;
typedef struct Wdir Wdir;
struct Wdir {
Dir *d;
int nd;
char *name;
};
struct Job { struct Job {
Job *next; Job *next;
@ -10,6 +17,7 @@ struct Job {
int dfd; int dfd;
char **av; char **av;
char *buf; /* backing for av */ char *buf; /* backing for av */
Wdir *wdir; /* work dir */
Dir *dp; /* not owned */ Dir *dp; /* not owned */
Mlock *l; Mlock *l;
Biobuf *b; Biobuf *b;
@ -17,30 +25,23 @@ struct Job {
void doalldirs(void); void doalldirs(void);
void dodir(char*); void dodir(char*);
Job* dofile(Dir*); Job* dofile(Wdir*, Dir*);
Job* donefile(Job*, Waitmsg*); Job* donefile(Job*, Waitmsg*);
void freejob(Job*); void freejob(Job*);
void rundir(char*); void rundir(char*);
char* file(char*, char); char* file(char*, char);
void warning(char*, void*); void warning(char*, void*);
void error(char*, void*); void error(char*, void*);
int returnmail(char**, char*, char*); int returnmail(char**, Wdir*, char*, char*);
void logit(char*, char*, char**); void logit(char*, Wdir*, char*, char**);
void doload(int); void doload(int);
#define HUNK 32
char *cmd; char *cmd;
char *root; char *root;
int debug; int debug;
int giveup = 2*24*60*60; int giveup = 2*24*60*60;
int limit; int limit;
/* the current directory */
Dir *dirbuf;
long ndirbuf = 0;
int nfiles;
char *curdir;
char *runqlog = "runq"; char *runqlog = "runq";
char **badsys; /* array of recalcitrant systems */ char **badsys; /* array of recalcitrant systems */
@ -48,6 +49,7 @@ int nbad;
int njob = 1; /* number of concurrent jobs to invoke */ int njob = 1; /* number of concurrent jobs to invoke */
int Eflag; /* ignore E.xxxxxx dates */ int Eflag; /* ignore E.xxxxxx dates */
int Rflag; /* no giving up, ever */ int Rflag; /* no giving up, ever */
int aflag; /* do all dirs */
void void
usage(void) usage(void)
@ -82,27 +84,37 @@ main(int argc, char **argv)
case 'q': case 'q':
qdir = EARGF(usage()); qdir = EARGF(usage());
break; break;
case 'a':
aflag++;
break;
case 'n': case 'n':
njob = atoi(EARGF(usage())); njob = atoi(EARGF(usage()));
if(njob == 0) if(njob == 0)
usage(); usage();
break; break;
default:
usage();
break;
}ARGEND; }ARGEND;
if(argc != 2) if(argc != 2)
usage(); usage();
if(qdir == nil) if(!aflag && qdir == nil){
qdir = getuser(); qdir = getuser();
if(qdir == nil) if(qdir == nil)
error("unknown user", 0); error("unknown user", 0);
}
root = argv[0]; root = argv[0];
cmd = argv[1]; cmd = argv[1];
if(chdir(root) < 0) if(chdir(root) < 0)
error("can't cd to %s", root); error("can't cd to %s", root);
dodir(qdir); if(aflag)
doalldirs();
else
dodir(qdir);
exits(0); exits(0);
} }
@ -128,14 +140,42 @@ emptydir(char *name)
return 0; return 0;
} }
/*
* run all user directories, must be bootes (or root on unix) to do this
*/
void
doalldirs(void)
{
Dir *db;
int fd;
long i, n;
if((fd = open(".", OREAD)) == -1)
warning("opening %s", root);
return;
}
if((n = dirreadall(fd, &db)) == -1){
warning("reading %s: ", root);
return;
}
for(i=0; i<n; i++){
if((db[i].qid.type & QTDIR) == 0)
continue;
if(emptydir(db[i].name))
continue;
dodir(db[i].name);
}
free(db);
close(fd);
}
/* /*
* cd to a user directory and run it * cd to a user directory and run it
*/ */
void void
dodir(char *name) dodir(char *name)
{ {
curdir = name;
if(chdir(name) < 0){ if(chdir(name) < 0){
warning("cd to %s", name); warning("cd to %s", name);
return; return;
@ -152,9 +192,10 @@ dodir(char *name)
void void
rundir(char *name) rundir(char *name)
{ {
Job *hd, *j, **p;
int nlive, fidx, fd, found; int nlive, fidx, fd, found;
Job *hd, *j, **p;
Waitmsg *w; Waitmsg *w;
Wdir wd;
fd = open(".", OREAD); fd = open(".", OREAD);
if(fd == -1){ if(fd == -1){
@ -164,14 +205,15 @@ rundir(char *name)
fidx= 0; fidx= 0;
hd = nil; hd = nil;
nlive = 0; nlive = 0;
nfiles = dirreadall(fd, &dirbuf); wd.name = name;
while(nlive > 0 || fidx< nfiles){ wd.nd = dirreadall(fd, &wd.d);
while(fidx< nfiles && nlive < njob){ while(nlive > 0 || fidx< wd.nd){
if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ while(fidx< wd.nd && nlive < njob){
if(strncmp(wd.d[fidx].name, "C.", 2) != 0){
fidx++; fidx++;
continue; continue;
} }
if((j = dofile(&dirbuf[fidx])) != nil){ if((j = dofile(&wd, &wd.d[fidx])) != nil){
nlive++; nlive++;
j->next = hd; j->next = hd;
hd = j; hd = j;
@ -201,7 +243,7 @@ rescan:
goto rescan; goto rescan;
} }
assert(hd == nil); assert(hd == nil);
free(dirbuf); free(wd.d);
close(fd); close(fd);
} }
@ -209,15 +251,15 @@ rescan:
* free files matching name in the current directory * free files matching name in the current directory
*/ */
void void
remmatch(char *name) remmatch(Wdir *w, char *name)
{ {
long i; long i;
syslog(0, runqlog, "removing %s/%s", curdir, name); syslog(0, runqlog, "removing %s/%s", w->name, name);
for(i=0; i<nfiles; i++){ for(i=0; i<w->nd; i++){
if(strcmp(&dirbuf[i].name[1], &name[1]) == 0) if(strcmp(&w->d[i].name[1], &name[1]) == 0)
remove(dirbuf[i].name); remove(w->d[i].name);
} }
/* error file (may have) appeared after we read the directory */ /* error file (may have) appeared after we read the directory */
@ -263,7 +305,7 @@ keeplockalive(char *path, int fd)
* tracks the running pid. * tracks the running pid.
*/ */
Job* Job*
dofile(Dir *dp) dofile(Wdir *w, Dir *dp)
{ {
int dtime, efd, i, etime; int dtime, efd, i, etime;
Job *j; Job *j;
@ -280,13 +322,13 @@ dofile(Dir *dp)
d = dirstat(file(dp->name, 'D')); d = dirstat(file(dp->name, 'D'));
if(d == nil){ if(d == nil){
syslog(0, runqlog, "no data file for %s", dp->name); syslog(0, runqlog, "no data file for %s", dp->name);
remmatch(dp->name); remmatch(w, dp->name);
return nil; return nil;
} }
if(dp->length == 0){ if(dp->length == 0){
if(time(0)-dp->mtime > 15*60){ if(time(0)-dp->mtime > 15*60){
syslog(0, runqlog, "empty ctl file for %s", dp->name); syslog(0, runqlog, "empty ctl file for %s", dp->name);
remmatch(dp->name); remmatch(w, dp->name);
} }
return nil; return nil;
} }
@ -338,7 +380,7 @@ dofile(Dir *dp)
* - read args into (malloc'd) buffer * - read args into (malloc'd) buffer
* - malloc a vector and copy pointers to args into it * - malloc a vector and copy pointers to args into it
*/ */
j->wdir = w;
j->buf = malloc(dp->length+1); j->buf = malloc(dp->length+1);
if(j->buf == nil){ if(j->buf == nil){
warning("buffer allocation", 0); warning("buffer allocation", 0);
@ -381,9 +423,9 @@ dofile(Dir *dp)
j->av[j->ac] = 0; j->av[j->ac] = 0;
if(!Eflag &&time(0) - dtime > giveup){ if(!Eflag &&time(0) - dtime > giveup){
if(returnmail(j->av, dp->name, "Giveup") != 0) if(returnmail(j->av, w, dp->name, "Giveup") != 0)
logit("returnmail failed", dp->name, j->av); logit("returnmail failed", w, dp->name, j->av);
remmatch(dp->name); remmatch(w, dp->name);
goto done; goto done;
} }
@ -415,7 +457,7 @@ dofile(Dir *dp)
fprint(2, " %s", j->av[i]); fprint(2, " %s", j->av[i]);
fprint(2, "\n"); fprint(2, "\n");
} }
logit("execing", dp->name, j->av); logit("execing", w, dp->name, j->av);
close(0); close(0);
dup(j->dfd, 0); dup(j->dfd, 0);
close(j->dfd); close(j->dfd);
@ -461,9 +503,9 @@ donefile(Job *j, Waitmsg *wm)
fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
if(!Rflag && strstr(wm->msg, "Retry")==0){ if(!Rflag && strstr(wm->msg, "Retry")==0){
/* return the message and remove it */ /* return the message and remove it */
if(returnmail(j->av, j->dp->name, wm->msg) != 0) if(returnmail(j->av, j->wdir, j->dp->name, wm->msg) != 0)
logit("returnmail failed", j->dp->name, j->av); logit("returnmail failed", j->wdir, j->dp->name, j->av);
remmatch(j->dp->name); remmatch(j->wdir, j->dp->name);
} else { } else {
/* add sys to bad list and try again later */ /* add sys to bad list and try again later */
nbad++; nbad++;
@ -472,7 +514,7 @@ donefile(Job *j, Waitmsg *wm)
} }
} else { } else {
/* it worked remove the message */ /* it worked remove the message */
remmatch(j->dp->name); remmatch(j->wdir, j->dp->name);
} }
n = j->next; n = j->next;
freejob(j); freejob(j);
@ -520,7 +562,7 @@ file(char *name, char type)
* return 0 if successful * return 0 if successful
*/ */
int int
returnmail(char **av, char *name, char *msg) returnmail(char **av, Wdir *w, char *name, char *msg)
{ {
char buf[256], attachment[Pathlen], *sender; char buf[256], attachment[Pathlen], *sender;
int i, fd, pfd[2]; int i, fd, pfd[2];
@ -529,7 +571,7 @@ returnmail(char **av, char *name, char *msg)
String *s; String *s;
if(av[1] == 0 || av[2] == 0){ if(av[1] == 0 || av[2] == 0){
logit("runq - dumping bad file", name, av); logit("runq - dumping bad file", w, name, av);
return 0; return 0;
} }
@ -537,21 +579,21 @@ returnmail(char **av, char *name, char *msg)
sender = s_to_c(s); sender = s_to_c(s);
if(!returnable(sender) || strcmp(sender, "postmaster") == 0) { if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
logit("runq - dumping p to p mail", name, av); logit("runq - dumping p to p mail", w, name, av);
return 0; return 0;
} }
if(pipe(pfd) < 0){ if(pipe(pfd) < 0){
logit("runq - pipe failed", name, av); logit("runq - pipe failed", w, name, av);
return -1; return -1;
} }
switch(rfork(RFFDG|RFPROC|RFENVG)){ switch(rfork(RFFDG|RFPROC|RFENVG)){
case -1: case -1:
logit("runq - fork failed", name, av); logit("runq - fork failed", w, name, av);
return -1; return -1;
case 0: case 0:
logit("returning", name, av); logit("returning", w, name, av);
close(pfd[1]); close(pfd[1]);
close(0); close(0);
dup(pfd[0], 0); dup(pfd[0], 0);
@ -592,14 +634,14 @@ out:
wm = wait(); wm = wait();
if(wm == nil){ if(wm == nil){
syslog(0, "runq", "wait: %r"); syslog(0, "runq", "wait: %r");
logit("wait failed", name, av); logit("wait failed", w, name, av);
return -1; return -1;
} }
i = 0; i = 0;
if(wm->msg[0]){ if(wm->msg[0]){
i = -1; i = -1;
syslog(0, "runq", "returnmail child: %s", wm->msg); syslog(0, "runq", "returnmail child: %s", wm->msg);
logit("returnmail child failed", name, av); logit("returnmail child failed", w, name, av);
} }
free(wm); free(wm);
return i; return i;
@ -635,12 +677,12 @@ error(char *f, void *a)
} }
void void
logit(char *msg, char *file, char **av) logit(char *msg, Wdir *w, char *file, char **av)
{ {
int n, m; int n, m;
char buf[256]; char buf[256];
n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg); n = snprint(buf, sizeof(buf), "%s/%s: %s", w->name, file, msg);
for(; *av; av++){ for(; *av; av++){
m = strlen(*av); m = strlen(*av);
if(n + m + 4 > sizeof(buf)) if(n + m + 4 > sizeof(buf))