exportfs: bring back the changes, bug was due to rendezvous group being shared by listen

exportfs used pid of slave proc as rendezvous tag. when we
changed it to use Proc* memory address, there where tag
collisions because listen didnt fork the rendezvous group (bug!).

for now, just do rfork(RFREND) in main just in case.

will fix aux/listen in a follow up changeset.

--
cinap
This commit is contained in:
cinap_lenrek 2014-02-21 05:23:21 +01:00
parent d7378c10d8
commit 40d71baf7f
3 changed files with 67 additions and 79 deletions

View file

@ -230,7 +230,7 @@ main(int argc, char **argv)
DEBUG(DFD, "exportfs: started\n"); DEBUG(DFD, "exportfs: started\n");
rfork(RFNOTEG); rfork(RFNOTEG|RFREND);
if(messagesize == 0){ if(messagesize == 0){
messagesize = iounit(netfd); messagesize = iounit(netfd);
@ -460,8 +460,8 @@ reply(Fcall *r, Fcall *t, char *err)
fatal(Enomem); fatal(Enomem);
n = convS2M(t, data, messagesize); n = convS2M(t, data, messagesize);
if(write(netfd, data, n)!=n){ if(write(netfd, data, n)!=n){
syslog(0, "exportfs", "short write: %r"); /* not fatal, might have got a note due to flush */
fatal("mount write"); fprint(2, "exportfs: short write in reply: %r\n");
} }
free(data); free(data);
} }
@ -570,8 +570,6 @@ getsbuf(void)
unlock(&sbufalloc); unlock(&sbufalloc);
w = emallocz(sizeof(*w) + messagesize); w = emallocz(sizeof(*w) + messagesize);
} }
w->pid = 0;
w->canint = 0;
w->flushtag = NOTAG; w->flushtag = NOTAG;
return w; return w;
} }
@ -579,8 +577,6 @@ getsbuf(void)
void void
putsbuf(Fsrpc *w) putsbuf(Fsrpc *w)
{ {
w->pid = 0;
w->canint = 0;
w->flushtag = NOTAG; w->flushtag = NOTAG;
lock(&sbufalloc); lock(&sbufalloc);
w->next = sbufalloc.free; w->next = sbufalloc.free;

View file

@ -15,8 +15,6 @@ typedef struct Qidtab Qidtab;
struct Fsrpc struct Fsrpc
{ {
Fsrpc *next; /* freelist */ Fsrpc *next; /* freelist */
uintptr pid; /* Pid of slave process executing the rpc */
int canint; /* Interrupt gate */
int flushtag; /* Tag on which to reply to flush */ int flushtag; /* Tag on which to reply to flush */
Fcall work; /* Plan 9 incoming Fcall */ Fcall work; /* Plan 9 incoming Fcall */
uchar buf[]; /* Data buffer */ uchar buf[]; /* Data buffer */
@ -53,9 +51,10 @@ struct File
struct Proc struct Proc
{ {
uintptr pid; Lock;
Fsrpc *busy; Fsrpc *busy;
Proc *next; Proc *next;
int pid;
}; };
struct Qidtab struct Qidtab
@ -70,7 +69,6 @@ struct Qidtab
enum enum
{ {
MAXPROC = 50,
FHASHSIZE = 64, FHASHSIZE = 64,
Fidchunk = 1000, Fidchunk = 1000,
Npsmpt = 32, Npsmpt = 32,
@ -128,7 +126,7 @@ void freefile(File*);
void slaveopen(Fsrpc*); void slaveopen(Fsrpc*);
void slaveread(Fsrpc*); void slaveread(Fsrpc*);
void slavewrite(Fsrpc*); void slavewrite(Fsrpc*);
void blockingslave(void); void blockingslave(Proc*);
void reopen(Fid *f); void reopen(Fid *f);
void noteproc(int, char*); void noteproc(int, char*);
void flushaction(void*, char*); void flushaction(void*, char*);

View file

@ -64,14 +64,20 @@ Xflush(Fsrpc *t)
for(m = Proclist; m; m = m->next){ for(m = Proclist; m; m = m->next){
w = m->busy; w = m->busy;
if(w != 0 && w->pid == m->pid && w->work.tag == t->work.oldtag) { if(w == nil || w->work.tag != t->work.oldtag)
continue;
lock(m);
w = m->busy;
if(w != nil && w->work.tag == t->work.oldtag) {
w->flushtag = t->work.tag; w->flushtag = t->work.tag;
DEBUG(DFD, "\tset flushtag %d\n", t->work.tag); DEBUG(DFD, "\tset flushtag %d\n", t->work.tag);
if(w->canint) postnote(PNPROC, m->pid, "flush");
postnote(PNPROC, w->pid, "flush"); unlock(m);
putsbuf(t); putsbuf(t);
return; return;
} }
unlock(m);
} }
reply(&t->work, &rhdr, 0); reply(&t->work, &rhdr, 0);
@ -459,10 +465,10 @@ procsetname(char *fmt, ...)
void void
slave(Fsrpc *f) slave(Fsrpc *f)
{ {
Proc *p;
uintptr pid;
Fcall rhdr;
static int nproc; static int nproc;
Proc *m, **l;
Fcall rhdr;
int pid;
if(readonly){ if(readonly){
switch(f->work.type){ switch(f->work.type){
@ -479,30 +485,41 @@ slave(Fsrpc *f)
} }
} }
for(;;) { for(;;) {
for(p = Proclist; p; p = p->next) { for(l = &Proclist; (m = *l) != nil; l = &m->next) {
if(p->busy == 0) { if(m->busy != nil)
f->pid = p->pid; continue;
p->busy = f;
do {
pid = (uintptr)rendezvous((void*)p->pid, f);
}
while(pid == ~0); /* Interrupted */
if(pid != p->pid)
fatal("rendezvous sync fail");
return;
}
}
if(nproc >= MAXPROC){ m->busy = f;
reply(&f->work, &rhdr, Enoprocs); while(rendezvous(m, f) == (void*)~0)
putsbuf(f); ;
return;
/* swept a slave proc */
if(f == nil){
*l = m->next;
free(m);
nproc--;
break;
}
f = nil;
/*
* as long as the number of slave procs
* is small, dont bother sweeping.
*/
if(nproc < 16)
break;
} }
nproc++; if(f == nil)
pid = rfork(RFPROC|RFMEM); return;
m = emallocz(sizeof(Proc));
pid = rfork(RFPROC|RFMEM|RFNOWAIT);
switch(pid) { switch(pid) {
case -1: case -1:
fatal("rfork"); reply(&f->work, &rhdr, Enoprocs);
putsbuf(f);
free(m);
return;
case 0: case 0:
if (local[0] != '\0') if (local[0] != '\0')
@ -511,44 +528,34 @@ slave(Fsrpc *f)
local, remote); local, remote);
else else
procsetname("%s -> %s", local, remote); procsetname("%s -> %s", local, remote);
blockingslave(); blockingslave(m);
fatal("slave"); _exits(0);
default: default:
p = emallocz(sizeof(Proc)); m->pid = pid;
p->busy = 0; m->next = Proclist;
p->pid = pid; Proclist = m;
p->next = Proclist; nproc++;
Proclist = p;
while(rendezvous((void*)pid, p) == (void*)~0)
;
} }
} }
} }
void void
blockingslave(void) blockingslave(Proc *m)
{ {
Fsrpc *p; Fsrpc *p;
Fcall rhdr; Fcall rhdr;
Proc *m;
uintptr pid;
notify(flushaction); notify(flushaction);
pid = getpid();
do {
m = rendezvous((void*)pid, 0);
}
while(m == (void*)~0); /* Interrupted */
for(;;) { for(;;) {
p = rendezvous((void*)pid, (void*)pid); p = rendezvous(m, nil);
if(p == (void*)~0) /* Interrupted */ if(p == (void*)~0) /* Interrupted */
continue; continue;
if(p == nil) /* Swept */
break;
DEBUG(DFD, "\tslave: %p %F p %p\n", pid, &p->work, p->pid); DEBUG(DFD, "\tslave: %d %F\n", m->pid, &p->work);
if(p->flushtag != NOTAG) if(p->flushtag != NOTAG)
goto flushme; goto flushme;
@ -568,13 +575,17 @@ blockingslave(void)
default: default:
reply(&p->work, &rhdr, "exportfs: slave type error"); reply(&p->work, &rhdr, "exportfs: slave type error");
} }
if(p->flushtag != NOTAG) {
flushme: flushme:
lock(m);
m->busy = nil;
unlock(m);
/* no more flushes can come in now */
if(p->flushtag != NOTAG) {
p->work.type = Tflush; p->work.type = Tflush;
p->work.tag = p->flushtag; p->work.tag = p->flushtag;
reply(&p->work, &rhdr, 0); reply(&p->work, &rhdr, 0);
} }
m->busy = 0;
putsbuf(p); putsbuf(p);
} }
} }
@ -654,16 +665,8 @@ slaveopen(Fsrpc *p)
path = makepath(f->f, ""); path = makepath(f->f, "");
DEBUG(DFD, "\topen: %s %d\n", path, work->mode); DEBUG(DFD, "\topen: %s %d\n", path, work->mode);
p->canint = 1;
if(p->flushtag != NOTAG){
free(path);
return;
}
/* There is a race here I ignore because there are no locks */
f->fid = open(path, work->mode); f->fid = open(path, work->mode);
free(path); free(path);
p->canint = 0;
if(f->fid < 0 || (d = dirfstat(f->fid)) == nil) { if(f->fid < 0 || (d = dirfstat(f->fid)) == nil) {
Error: Error:
errstr(err, sizeof err); errstr(err, sizeof err);
@ -703,9 +706,6 @@ slaveread(Fsrpc *p)
} }
n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count; n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count;
p->canint = 1;
if(p->flushtag != NOTAG)
return;
data = malloc(n); data = malloc(n);
if(data == 0) { if(data == 0) {
reply(work, &rhdr, Enomem); reply(work, &rhdr, Enomem);
@ -717,14 +717,12 @@ slaveread(Fsrpc *p)
r = preaddir(f, (uchar*)data, n, work->offset); r = preaddir(f, (uchar*)data, n, work->offset);
else else
r = pread(f->fid, data, n, work->offset); r = pread(f->fid, data, n, work->offset);
p->canint = 0;
if(r < 0) { if(r < 0) {
free(data); free(data);
errstr(err, sizeof err); errstr(err, sizeof err);
reply(work, &rhdr, err); reply(work, &rhdr, err);
return; return;
} }
DEBUG(DFD, "\tread: fd=%d %d bytes\n", f->fid, r); DEBUG(DFD, "\tread: fd=%d %d bytes\n", f->fid, r);
rhdr.data = data; rhdr.data = data;
@ -750,11 +748,7 @@ slavewrite(Fsrpc *p)
} }
n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count; n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count;
p->canint = 1;
if(p->flushtag != NOTAG)
return;
n = pwrite(f->fid, work->data, n, work->offset); n = pwrite(f->fid, work->data, n, work->offset);
p->canint = 0;
if(n < 0) { if(n < 0) {
errstr(err, sizeof err); errstr(err, sizeof err);
reply(work, &rhdr, err); reply(work, &rhdr, err);