lib9p: defer closing down srv until the last request has been responded, Tversion message size

in multithreaded programs, we have to wait until all outstanding
requests have been responded before closing down the srv.

dont make write errors sysfatal(), only print them. in case if
listensrv() is used we dont want to exit the process in respond()
called by some worker thread.

make sure Tversion is only handled when there are no outstanding
requests and make sure message size is sane.
This commit is contained in:
cinap_lenrek 2013-01-30 06:26:03 +01:00
parent bc0e5ffa22
commit dbbbff8915
2 changed files with 39 additions and 8 deletions

View file

@ -234,6 +234,7 @@ struct Srv {
QLock slock; QLock slock;
Ref sref; Ref sref;
Ref rref;
}; };
void srv(Srv*); void srv(Srv*);

View file

@ -163,23 +163,34 @@ walkandclone(Req *r, char *(*walk1)(Fid*, char*, void*), char *(*clone)(Fid*, Fi
} }
static void static void
sversion(Srv*, Req *r) sversion(Srv *srv, Req *r)
{ {
if(srv->rref.ref != 2){
respond(r, Ebotch);
return;
}
if(strncmp(r->ifcall.version, "9P", 2) != 0){ if(strncmp(r->ifcall.version, "9P", 2) != 0){
r->ofcall.version = "unknown"; r->ofcall.version = "unknown";
respond(r, nil); respond(r, nil);
return; return;
} }
r->ofcall.version = "9P2000"; r->ofcall.version = "9P2000";
r->ofcall.msize = r->ifcall.msize; if(r->ifcall.msize < 256){
respond(r, "version: message size too small");
return;
}
if(r->ifcall.msize < 1024*1024)
r->ofcall.msize = r->ifcall.msize;
else
r->ofcall.msize = 1024*1024;
respond(r, nil); respond(r, nil);
} }
static void static void
rversion(Req *r, char *error) rversion(Req *r, char *error)
{ {
assert(error == nil); if(error == nil)
changemsize(r->srv, r->ofcall.msize); changemsize(r->srv, r->ofcall.msize);
} }
static void static void
@ -682,14 +693,18 @@ rwstat(Req*, char*)
{ {
} }
static void srvclose(Srv *);
static void static void
srvwork(void *v) srvwork(void *v)
{ {
Srv *srv = v; Srv *srv = v;
Req *r; Req *r;
incref(&srv->rref);
incref(&srv->sref); incref(&srv->sref);
while(r = getreq(srv)){ while(r = getreq(srv)){
incref(&srv->rref);
if(r->error){ if(r->error){
respond(r, r->error); respond(r, r->error);
continue; continue;
@ -715,9 +730,19 @@ srvwork(void *v)
} }
qunlock(&srv->slock); qunlock(&srv->slock);
} }
if(decref(&srv->sref)) decref(&srv->sref);
srvclose(srv);
}
static void
srvclose(Srv *srv)
{
if(decref(&srv->rref))
return; return;
if(chatty9p)
fprint(2, "srvclose\n");
free(srv->rbuf); free(srv->rbuf);
srv->rbuf = nil; srv->rbuf = nil;
free(srv->wbuf); free(srv->wbuf);
@ -753,6 +778,9 @@ srv(Srv *srv)
fmtinstall('D', dirfmt); fmtinstall('D', dirfmt);
fmtinstall('F', fcallfmt); fmtinstall('F', fcallfmt);
srv->sref.ref = 0;
srv->rref.ref = 0;
if(srv->fpool == nil) if(srv->fpool == nil)
srv->fpool = allocfidpool(srv->destroyfid); srv->fpool = allocfidpool(srv->destroyfid);
if(srv->rpool == nil) if(srv->rpool == nil)
@ -819,7 +847,7 @@ if(chatty9p)
qlock(&srv->wlock); qlock(&srv->wlock);
n = convS2M(&r->ofcall, srv->wbuf, srv->msize); n = convS2M(&r->ofcall, srv->wbuf, srv->msize);
if(n <= 0){ if(n <= 0){
fprint(2, "n = %d %F\n", n, &r->ofcall); fprint(2, "msize = %d n = %d %F\n", srv->msize, n, &r->ofcall);
abort(); abort();
} }
assert(n > 2); assert(n > 2);
@ -827,7 +855,7 @@ if(chatty9p)
closereq(removereq(r->pool, r->ifcall.tag)); closereq(removereq(r->pool, r->ifcall.tag));
m = write(srv->outfd, srv->wbuf, n); m = write(srv->outfd, srv->wbuf, n);
if(m != n) if(m != n)
sysfatal("lib9p srv: write %d returned %d on fd %d: %r", n, m, srv->outfd); fprint(2, "lib9p srv: write %d returned %d on fd %d: %r", n, m, srv->outfd);
qunlock(&srv->wlock); qunlock(&srv->wlock);
qlock(&r->lk); /* no one will add flushes now */ qlock(&r->lk); /* no one will add flushes now */
@ -844,6 +872,8 @@ if(chatty9p)
closereq(r); closereq(r);
else else
free(r); free(r);
srvclose(srv);
} }
void void