This commit is contained in:
cinap_lenrek 2014-02-21 01:03:31 +01:00
commit d7378c10d8
3 changed files with 76 additions and 64 deletions

View file

@ -460,8 +460,8 @@ reply(Fcall *r, Fcall *t, char *err)
fatal(Enomem); fatal(Enomem);
n = convS2M(t, data, messagesize); n = convS2M(t, data, messagesize);
if(write(netfd, data, n)!=n){ if(write(netfd, data, n)!=n){
/* not fatal, might have got a note due to flush */ syslog(0, "exportfs", "short write: %r");
fprint(2, "exportfs: short write in reply: %r\n"); fatal("mount write");
} }
free(data); free(data);
} }
@ -570,6 +570,8 @@ getsbuf(void)
unlock(&sbufalloc); unlock(&sbufalloc);
w = emallocz(sizeof(*w) + messagesize); w = emallocz(sizeof(*w) + messagesize);
} }
w->pid = 0;
w->canint = 0;
w->flushtag = NOTAG; w->flushtag = NOTAG;
return w; return w;
} }
@ -577,6 +579,8 @@ getsbuf(void)
void void
putsbuf(Fsrpc *w) putsbuf(Fsrpc *w)
{ {
w->pid = 0;
w->canint = 0;
w->flushtag = NOTAG; w->flushtag = NOTAG;
lock(&sbufalloc); lock(&sbufalloc);
w->next = sbufalloc.free; w->next = sbufalloc.free;

View file

@ -15,6 +15,8 @@ typedef struct Qidtab Qidtab;
struct Fsrpc struct Fsrpc
{ {
Fsrpc *next; /* freelist */ Fsrpc *next; /* freelist */
uintptr pid; /* Pid of slave process executing the rpc */
int canint; /* Interrupt gate */
int flushtag; /* Tag on which to reply to flush */ int flushtag; /* Tag on which to reply to flush */
Fcall work; /* Plan 9 incoming Fcall */ Fcall work; /* Plan 9 incoming Fcall */
uchar buf[]; /* Data buffer */ uchar buf[]; /* Data buffer */
@ -51,10 +53,9 @@ struct File
struct Proc struct Proc
{ {
Lock; uintptr pid;
Fsrpc *busy; Fsrpc *busy;
Proc *next; Proc *next;
int pid;
}; };
struct Qidtab struct Qidtab
@ -69,6 +70,7 @@ struct Qidtab
enum enum
{ {
MAXPROC = 50,
FHASHSIZE = 64, FHASHSIZE = 64,
Fidchunk = 1000, Fidchunk = 1000,
Npsmpt = 32, Npsmpt = 32,
@ -126,7 +128,7 @@ void freefile(File*);
void slaveopen(Fsrpc*); void slaveopen(Fsrpc*);
void slaveread(Fsrpc*); void slaveread(Fsrpc*);
void slavewrite(Fsrpc*); void slavewrite(Fsrpc*);
void blockingslave(Proc*); void blockingslave(void);
void reopen(Fid *f); void reopen(Fid *f);
void noteproc(int, char*); void noteproc(int, char*);
void flushaction(void*, char*); void flushaction(void*, char*);

View file

@ -64,20 +64,14 @@ Xflush(Fsrpc *t)
for(m = Proclist; m; m = m->next){ for(m = Proclist; m; m = m->next){
w = m->busy; w = m->busy;
if(w == nil || w->work.tag != t->work.oldtag) if(w != 0 && w->pid == m->pid && w->work.tag == t->work.oldtag) {
continue;
lock(m);
w = m->busy;
if(w != nil && w->work.tag == t->work.oldtag) {
w->flushtag = t->work.tag; w->flushtag = t->work.tag;
DEBUG(DFD, "\tset flushtag %d\n", t->work.tag); DEBUG(DFD, "\tset flushtag %d\n", t->work.tag);
postnote(PNPROC, m->pid, "flush"); if(w->canint)
unlock(m); postnote(PNPROC, w->pid, "flush");
putsbuf(t); putsbuf(t);
return; return;
} }
unlock(m);
} }
reply(&t->work, &rhdr, 0); reply(&t->work, &rhdr, 0);
@ -465,10 +459,10 @@ procsetname(char *fmt, ...)
void void
slave(Fsrpc *f) slave(Fsrpc *f)
{ {
static int nproc; Proc *p;
Proc *p, **l; uintptr pid;
Fcall rhdr; Fcall rhdr;
int pid; static int nproc;
if(readonly){ if(readonly){
switch(f->work.type){ switch(f->work.type){
@ -485,41 +479,30 @@ slave(Fsrpc *f)
} }
} }
for(;;) { for(;;) {
for(l = &Proclist; (p = *l) != nil; l = &p->next) { for(p = Proclist; p; p = p->next) {
if(p->busy != nil) if(p->busy == 0) {
continue; f->pid = p->pid;
p->busy = f;
p->busy = f; do {
while(rendezvous(p, f) == (void*)~0) pid = (uintptr)rendezvous((void*)p->pid, f);
; }
while(pid == ~0); /* Interrupted */
/* swept a slave proc */ if(pid != p->pid)
if(f == nil){ fatal("rendezvous sync fail");
*l = p->next; return;
free(p); }
nproc--;
break;
}
f = nil;
/*
* as long as the number of slave procs
* is small, dont bother sweeping.
*/
if(nproc < 16)
break;
} }
if(f == nil)
return;
p = emallocz(sizeof(Proc)); if(nproc >= MAXPROC){
pid = rfork(RFPROC|RFMEM|RFNOWAIT);
switch(pid) {
case -1:
reply(&f->work, &rhdr, Enoprocs); reply(&f->work, &rhdr, Enoprocs);
putsbuf(f); putsbuf(f);
free(p);
return; return;
}
nproc++;
pid = rfork(RFPROC|RFMEM);
switch(pid) {
case -1:
fatal("rfork");
case 0: case 0:
if (local[0] != '\0') if (local[0] != '\0')
@ -528,34 +511,44 @@ slave(Fsrpc *f)
local, remote); local, remote);
else else
procsetname("%s -> %s", local, remote); procsetname("%s -> %s", local, remote);
blockingslave(p); blockingslave();
_exits(0); fatal("slave");
default: default:
p = emallocz(sizeof(Proc));
p->busy = 0;
p->pid = pid; p->pid = pid;
p->next = Proclist; p->next = Proclist;
Proclist = p; Proclist = p;
nproc++; while(rendezvous((void*)pid, p) == (void*)~0)
;
} }
} }
} }
void void
blockingslave(Proc *m) blockingslave(void)
{ {
Fsrpc *p; Fsrpc *p;
Fcall rhdr; Fcall rhdr;
Proc *m;
uintptr pid;
notify(flushaction); notify(flushaction);
for(;;) { pid = getpid();
p = rendezvous(m, nil);
if(p == (void*)~0) /* Interrupted */
continue;
if(p == nil) /* Swept */
break;
DEBUG(DFD, "\tslave: %d %F\n", m->pid, &p->work); do {
m = rendezvous((void*)pid, 0);
}
while(m == (void*)~0); /* Interrupted */
for(;;) {
p = rendezvous((void*)pid, (void*)pid);
if(p == (void*)~0) /* Interrupted */
continue;
DEBUG(DFD, "\tslave: %p %F p %p\n", pid, &p->work, p->pid);
if(p->flushtag != NOTAG) if(p->flushtag != NOTAG)
goto flushme; goto flushme;
@ -575,17 +568,13 @@ blockingslave(Proc *m)
default: default:
reply(&p->work, &rhdr, "exportfs: slave type error"); reply(&p->work, &rhdr, "exportfs: slave type error");
} }
flushme:
lock(m);
m->busy = nil;
unlock(m);
/* no more flushes can come in now */
if(p->flushtag != NOTAG) { if(p->flushtag != NOTAG) {
flushme:
p->work.type = Tflush; p->work.type = Tflush;
p->work.tag = p->flushtag; p->work.tag = p->flushtag;
reply(&p->work, &rhdr, 0); reply(&p->work, &rhdr, 0);
} }
m->busy = 0;
putsbuf(p); putsbuf(p);
} }
} }
@ -665,8 +654,16 @@ slaveopen(Fsrpc *p)
path = makepath(f->f, ""); path = makepath(f->f, "");
DEBUG(DFD, "\topen: %s %d\n", path, work->mode); DEBUG(DFD, "\topen: %s %d\n", path, work->mode);
p->canint = 1;
if(p->flushtag != NOTAG){
free(path);
return;
}
/* There is a race here I ignore because there are no locks */
f->fid = open(path, work->mode); f->fid = open(path, work->mode);
free(path); free(path);
p->canint = 0;
if(f->fid < 0 || (d = dirfstat(f->fid)) == nil) { if(f->fid < 0 || (d = dirfstat(f->fid)) == nil) {
Error: Error:
errstr(err, sizeof err); errstr(err, sizeof err);
@ -706,6 +703,9 @@ slaveread(Fsrpc *p)
} }
n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count; n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count;
p->canint = 1;
if(p->flushtag != NOTAG)
return;
data = malloc(n); data = malloc(n);
if(data == 0) { if(data == 0) {
reply(work, &rhdr, Enomem); reply(work, &rhdr, Enomem);
@ -717,12 +717,14 @@ slaveread(Fsrpc *p)
r = preaddir(f, (uchar*)data, n, work->offset); r = preaddir(f, (uchar*)data, n, work->offset);
else else
r = pread(f->fid, data, n, work->offset); r = pread(f->fid, data, n, work->offset);
p->canint = 0;
if(r < 0) { if(r < 0) {
free(data); free(data);
errstr(err, sizeof err); errstr(err, sizeof err);
reply(work, &rhdr, err); reply(work, &rhdr, err);
return; return;
} }
DEBUG(DFD, "\tread: fd=%d %d bytes\n", f->fid, r); DEBUG(DFD, "\tread: fd=%d %d bytes\n", f->fid, r);
rhdr.data = data; rhdr.data = data;
@ -748,7 +750,11 @@ slavewrite(Fsrpc *p)
} }
n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count; n = (work->count > messagesize-IOHDRSZ) ? messagesize-IOHDRSZ : work->count;
p->canint = 1;
if(p->flushtag != NOTAG)
return;
n = pwrite(f->fid, work->data, n, work->offset); n = pwrite(f->fid, work->data, n, work->offset);
p->canint = 0;
if(n < 0) { if(n < 0) {
errstr(err, sizeof err); errstr(err, sizeof err);
reply(work, &rhdr, err); reply(work, &rhdr, err);