runq: clean up code, fix error handling.
Runq spawns a number of processes, and wait()s for them in 2 different places. Because of the way that the exit handling is done, the wait can get the wrong message. It turns out that only one place in the code needs to wait for the child, and in all other cases, it's just muddling the problem. This change adds the RFNOWAIT call to all the processes we don't need to wait for, so that the places that do need wait will always get the correct child.
This commit is contained in:
parent
34ed7f7aa2
commit
49d7ca8d92
1 changed files with 46 additions and 52 deletions
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
typedef struct Job Job;
|
typedef struct Job Job;
|
||||||
typedef struct Wdir Wdir;
|
typedef struct Wdir Wdir;
|
||||||
|
typedef struct Wpid Wpid;
|
||||||
|
|
||||||
struct Wdir {
|
struct Wdir {
|
||||||
Dir *d;
|
Dir *d;
|
||||||
|
@ -41,6 +42,8 @@ char *root;
|
||||||
int debug;
|
int debug;
|
||||||
int giveup = 2*24*60*60;
|
int giveup = 2*24*60*60;
|
||||||
int limit;
|
int limit;
|
||||||
|
Wpid *waithd;
|
||||||
|
Wpid *waittl;
|
||||||
|
|
||||||
char *runqlog = "runq";
|
char *runqlog = "runq";
|
||||||
|
|
||||||
|
@ -196,6 +199,7 @@ rundir(char *name)
|
||||||
int nlive, fidx, fd, found;
|
int nlive, fidx, fd, found;
|
||||||
Job *hd, *j, **p;
|
Job *hd, *j, **p;
|
||||||
Waitmsg *w;
|
Waitmsg *w;
|
||||||
|
Mlock *l;
|
||||||
Wdir wd;
|
Wdir wd;
|
||||||
|
|
||||||
fd = open(".", OREAD);
|
fd = open(".", OREAD);
|
||||||
|
@ -203,28 +207,31 @@ rundir(char *name)
|
||||||
warning("reading %s", name);
|
warning("reading %s", name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if((l = syslock("./rundir")) == nil){
|
||||||
|
warning("locking %s", name);
|
||||||
|
close(fd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
fidx= 0;
|
fidx= 0;
|
||||||
hd = nil;
|
hd = nil;
|
||||||
nlive = 0;
|
nlive = 0;
|
||||||
wd.name = name;
|
wd.name = name;
|
||||||
wd.nd = dirreadall(fd, &wd.d);
|
wd.nd = dirreadall(fd, &wd.d);
|
||||||
while(nlive > 0 || fidx< wd.nd){
|
while(nlive > 0 || fidx< wd.nd){
|
||||||
while(fidx< wd.nd && nlive < njob){
|
for(; fidx< wd.nd && nlive < njob; fidx++){
|
||||||
if(strncmp(wd.d[fidx].name, "C.", 2) != 0){
|
if(strncmp(wd.d[fidx].name, "C.", 2) != 0)
|
||||||
fidx++;
|
continue;
|
||||||
|
if((j = dofile(&wd, &wd.d[fidx])) == nil){
|
||||||
|
if(debug) fprint(2, "skipping %s: %r\n", wd.d[fidx].name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if((j = dofile(&wd, &wd.d[fidx])) != nil){
|
|
||||||
nlive++;
|
nlive++;
|
||||||
j->next = hd;
|
j->next = hd;
|
||||||
hd = j;
|
hd = j;
|
||||||
}
|
}
|
||||||
fidx++;
|
/* nothing to do */
|
||||||
}
|
if(nlive == 0)
|
||||||
if(nlive == 0){
|
|
||||||
fprint(2, "nothing live\n");
|
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
rescan:
|
rescan:
|
||||||
if((w = wait()) == nil){
|
if((w = wait()) == nil){
|
||||||
syslog(0, "runq", "wait error: %r");
|
syslog(0, "runq", "wait error: %r");
|
||||||
|
@ -240,12 +247,15 @@ rescan:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
free(w);
|
free(w);
|
||||||
if(!found)
|
if(!found){
|
||||||
|
syslog(0, runqlog, "wait: pid not in job list");
|
||||||
goto rescan;
|
goto rescan;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
assert(hd == nil);
|
assert(hd == nil);
|
||||||
free(wd.d);
|
free(wd.d);
|
||||||
close(fd);
|
close(fd);
|
||||||
|
sysunlock(l);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -257,7 +267,6 @@ remmatch(Wdir *w, char *name)
|
||||||
long i;
|
long i;
|
||||||
|
|
||||||
syslog(0, runqlog, "removing %s/%s", w->name, name);
|
syslog(0, runqlog, "removing %s/%s", w->name, name);
|
||||||
|
|
||||||
for(i=0; i<w->nd; i++){
|
for(i=0; i<w->nd; i++){
|
||||||
if(strcmp(&w->d[i].name[1], &name[1]) == 0)
|
if(strcmp(&w->d[i].name[1], &name[1]) == 0)
|
||||||
remove(w->d[i].name);
|
remove(w->d[i].name);
|
||||||
|
@ -286,7 +295,7 @@ keeplockalive(char *path, int fd)
|
||||||
snprint(l->name, sizeof l->name, "%s", path);
|
snprint(l->name, sizeof l->name, "%s", path);
|
||||||
|
|
||||||
/* fork process to keep lock alive until sysunlock(l) */
|
/* fork process to keep lock alive until sysunlock(l) */
|
||||||
switch(l->pid = rfork(RFPROC)){
|
switch(l->pid = rfork(RFPROC|RFNOWAIT)){
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
case 0:
|
case 0:
|
||||||
|
@ -313,8 +322,7 @@ dofile(Wdir *w, Dir *dp)
|
||||||
Dir *d;
|
Dir *d;
|
||||||
char *cp;
|
char *cp;
|
||||||
|
|
||||||
if(debug)
|
if(debug) fprint(2, "dofile %s\n", dp->name);
|
||||||
fprint(2, "dofile %s\n", dp->name);
|
|
||||||
/*
|
/*
|
||||||
* if no data file or empty control or data file, just clean up
|
* if no data file or empty control or data file, just clean up
|
||||||
* the empty control file must be 15 minutes old, to minimize the
|
* the empty control file must be 15 minutes old, to minimize the
|
||||||
|
@ -344,14 +352,18 @@ dofile(Wdir *w, Dir *dp)
|
||||||
free(d);
|
free(d);
|
||||||
if(etime - dtime < 60*60){
|
if(etime - dtime < 60*60){
|
||||||
/* up to the first hour, try every 15 minutes */
|
/* up to the first hour, try every 15 minutes */
|
||||||
if(time(0) - etime < 15*60)
|
if(time(0) - etime < 15*60){
|
||||||
|
werrstr("early retry");
|
||||||
return nil;
|
return nil;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
/* after the first hour, try once an hour */
|
/* after the first hour, try once an hour */
|
||||||
if(time(0) - etime < 60*60)
|
if(time(0) - etime < 60*60){
|
||||||
|
werrstr("early retry");
|
||||||
return nil;
|
return nil;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* open control and data
|
* open control and data
|
||||||
|
@ -363,18 +375,11 @@ dofile(Wdir *w, Dir *dp)
|
||||||
j->dp = dp;
|
j->dp = dp;
|
||||||
j->dfd = -1;
|
j->dfd = -1;
|
||||||
j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
|
j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
|
||||||
if(j->b == 0) {
|
if(j->b == 0)
|
||||||
if(debug)
|
goto done;
|
||||||
fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
|
|
||||||
return nil;
|
|
||||||
}
|
|
||||||
j->dfd = open(file(dp->name, 'D'), OREAD);
|
j->dfd = open(file(dp->name, 'D'), OREAD);
|
||||||
if(j->dfd < 0){
|
if(j->dfd < 0)
|
||||||
if(debug)
|
goto done;
|
||||||
fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
|
|
||||||
freejob(j);
|
|
||||||
return nil;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* make arg list
|
* make arg list
|
||||||
|
@ -431,15 +436,20 @@ dofile(Wdir *w, Dir *dp)
|
||||||
}
|
}
|
||||||
|
|
||||||
for(i = 0; i < nbad; i++){
|
for(i = 0; i < nbad; i++){
|
||||||
if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
|
if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0){
|
||||||
|
werrstr("badsys: %s", j->av[3]);
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
/*
|
/*
|
||||||
* Ken's fs, for example, gives us 5 minutes of inactivity before
|
* Ken's fs, for example, gives us 5 minutes of inactivity before
|
||||||
* the lock goes stale, so we have to keep reading it.
|
* the lock goes stale, so we have to keep reading it.
|
||||||
*/
|
*/
|
||||||
j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
|
j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
|
||||||
|
if(j->l == nil){
|
||||||
|
warning("lock file", 0);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* transfer
|
* transfer
|
||||||
|
@ -498,7 +508,6 @@ donefile(Job *j, Waitmsg *wm)
|
||||||
|
|
||||||
if(debug)
|
if(debug)
|
||||||
fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
|
fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
|
||||||
|
|
||||||
if(wm->msg[0]){
|
if(wm->msg[0]){
|
||||||
if(debug)
|
if(debug)
|
||||||
fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
|
fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
|
||||||
|
@ -566,9 +575,8 @@ int
|
||||||
returnmail(char **av, Wdir *w, char *name, char *msg)
|
returnmail(char **av, Wdir *w, char *name, char *msg)
|
||||||
{
|
{
|
||||||
char buf[256], attachment[Pathlen], *sender;
|
char buf[256], attachment[Pathlen], *sender;
|
||||||
int i, fd, pfd[2];
|
int fd, pfd[2];
|
||||||
long n;
|
long n;
|
||||||
Waitmsg *wm;
|
|
||||||
String *s;
|
String *s;
|
||||||
|
|
||||||
if(av[1] == 0 || av[2] == 0){
|
if(av[1] == 0 || av[2] == 0){
|
||||||
|
@ -589,7 +597,7 @@ returnmail(char **av, Wdir *w, char *name, char *msg)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch(rfork(RFFDG|RFPROC|RFENVG)){
|
switch(rfork(RFFDG|RFPROC|RFENVG|RFNOWAIT)){
|
||||||
case -1:
|
case -1:
|
||||||
logit("runq - fork failed", w, name, av);
|
logit("runq - fork failed", w, name, av);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -625,27 +633,13 @@ returnmail(char **av, Wdir *w, char *name, char *msg)
|
||||||
break;
|
break;
|
||||||
if(write(pfd[1], buf, n) != n){
|
if(write(pfd[1], buf, n) != n){
|
||||||
close(fd);
|
close(fd);
|
||||||
goto out;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(fd);
|
close(fd);
|
||||||
}
|
}
|
||||||
close(pfd[1]);
|
close(pfd[1]);
|
||||||
out:
|
return 0;
|
||||||
wm = wait();
|
|
||||||
if(wm == nil){
|
|
||||||
syslog(0, "runq", "wait: %r");
|
|
||||||
logit("wait failed", w, name, av);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
i = 0;
|
|
||||||
if(wm->msg[0]){
|
|
||||||
i = -1;
|
|
||||||
syslog(0, "runq", "returnmail child: %s", wm->msg);
|
|
||||||
logit("returnmail child failed", w, name, av);
|
|
||||||
}
|
|
||||||
free(wm);
|
|
||||||
return i;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in a new issue