cwfs: fix out of order replies

using a shared reply queue and a pool of worker procs does
result in replies to be send out of order under some conditions.
the symptoms are mnt errors when interrupting requests (Rflush
arriving before the original requests response).

this change gives each connection its own reply queue and its
own srvo process. so now a connection consists of one reply
queue, a srvi process reading the connections file descriptor
and a srvo process reading the reply queue and writng replies
to the connections file descriptor.

the srvi processes live as long as the connection is established.
the srvo prcoesses live forever and are attached to the chan
(which gets reused).

to avoid excessive process creation, we limit the number of
connections to 30. srvchan() returns nil when all 30 network
channels are in use.
This commit is contained in:
cinap_lenrek 2013-08-08 01:07:01 +02:00
parent 28ff3a3cda
commit a28bdd3e1f
2 changed files with 50 additions and 35 deletions

View file

@ -56,17 +56,20 @@ neti(void *v)
net = v; net = v;
for(;;) { for(;;) {
if((lisfd = listen(net->anndir, net->lisdir)) < 0){ if((lisfd = listen(net->anndir, net->lisdir)) < 0){
fprint(2, "listen %s failed: %r\n", net->anndir); fprint(2, "%s: listen %s failed: %r\n", argv0, net->anndir);
break; break;
} }
/* got new call on lisfd */ /* got new call on lisfd */
if((accfd = accept(lisfd, net->lisdir)) < 0){ if((accfd = accept(lisfd, net->lisdir)) < 0){
fprint(2, "accept %d (from %s) failed: %r\n", lisfd, net->lisdir); fprint(2, "%s: accept %d (from %s) failed: %r\n", argv0, lisfd, net->lisdir);
close(lisfd); close(lisfd);
continue; continue;
} }
nci = getnetconninfo(net->lisdir, accfd); nci = getnetconninfo(net->lisdir, accfd);
srvchan(accfd, nci->raddr); if(srvchan(accfd, nci->raddr) == nil){
fprint(2, "%s: srvchan failed for: %s\n", argv0, nci->raddr);
close(accfd);
}
freenetconninfo(nci); freenetconninfo(nci);
} }
} }

View file

@ -4,9 +4,8 @@
#include <thread.h> #include <thread.h>
enum { enum {
Maxfdata = 8192, Nqueue = 100, /* reply queue size per connection (tunable) */
Nqueue = 200, /* queue size (tunable) */ Nchans = 30, /* maximum number of connections */
Nsrvo = 8, /* number of write workers */
}; };
typedef struct Srv Srv; typedef struct Srv Srv;
@ -22,8 +21,6 @@ static struct {
Chan *hd; Chan *hd;
} freechans; } freechans;
static Queue *srvoq;
void void
chanhangup(Chan *chan, char *msg) chanhangup(Chan *chan, char *msg)
{ {
@ -69,14 +66,16 @@ srvput(Srv *srv)
} }
static void static void
srvo(void *) srvo(void *aux)
{ {
Chan *chan;
Srv *srv; Srv *srv;
Msgbuf *mb; Msgbuf *mb;
char buf[ERRMAX]; char buf[ERRMAX];
chan = aux;
for(;;){ for(;;){
mb = fs_recv(srvoq, 0); mb = fs_recv(chan->reply, 0);
if(mb == nil) if(mb == nil)
continue; continue;
if(mb->data == nil){ if(mb->data == nil){
@ -84,12 +83,12 @@ srvo(void *)
mbfree(mb); mbfree(mb);
continue; continue;
} }
srv = (Srv*)mb->param; srv = chan->pdata;
while(write(srv->fd, mb->data, mb->count) != mb->count){ while(write(srv->fd, mb->data, mb->count) != mb->count){
rerrstr(buf, sizeof(buf)); rerrstr(buf, sizeof(buf));
if(strstr(buf, "interrupt")) if(strstr(buf, "interrupt"))
continue; continue;
chanhangup(srv->chan, buf); chanhangup(chan, buf);
break; break;
} }
mbfree(mb); mbfree(mb);
@ -100,13 +99,17 @@ srvo(void *)
static void static void
srvi(void *aux) srvi(void *aux)
{ {
Srv *srv = aux; Chan *chan;
Srv *srv;
Msgbuf *mb, *ms; Msgbuf *mb, *ms;
uchar *b, *p, *e; uchar *b, *p, *e;
int n, m; int n, m;
char buf[ERRMAX]; char buf[ERRMAX];
if((mb = mballoc(IOHDRSZ+Maxfdata, srv->chan, Mbeth1)) == nil) chan = aux;
srv = chan->pdata;
if((mb = mballoc(IOHDRSZ+MAXDAT, chan, Mbeth1)) == nil)
panic("srvi: mballoc failed"); panic("srvi: mballoc failed");
b = mb->data; b = mb->data;
p = b; p = b;
@ -126,12 +129,12 @@ Read:
goto Read; goto Read;
} }
if(m <= SMALLBUF){ if(m <= SMALLBUF){
if((ms = mballoc(m, srv->chan, Mbeth1)) == nil) if((ms = mballoc(m, chan, Mbeth1)) == nil)
panic("srvi: mballoc failed"); panic("srvi: mballoc failed");
memmove(ms->data, b, m); memmove(ms->data, b, m);
} else { } else {
ms = mb; ms = mb;
if((mb = mballoc(mb->count, srv->chan, Mbeth1)) == nil) if((mb = mballoc(mb->count, chan, Mbeth1)) == nil)
panic("srvi: mballoc failed"); panic("srvi: mballoc failed");
ms->count = m; ms->count = m;
} }
@ -141,8 +144,7 @@ Read:
p = b + n; p = b + n;
incref(srv); incref(srv);
ms->param = (uint)srv; fs_send(chan->send, ms);
fs_send(serveq, ms);
} }
e = b + mb->count; e = b + mb->count;
} }
@ -152,7 +154,7 @@ Error:
if(strstr(buf, "interrupt")) if(strstr(buf, "interrupt"))
goto Read; goto Read;
chanhangup(srv->chan, buf); chanhangup(chan, buf);
srvput(srv); srvput(srv);
mbfree(mb); mbfree(mb);
@ -166,18 +168,15 @@ srvchan(int fd, char *name)
Srv *srv; Srv *srv;
lock(&freechans); lock(&freechans);
if(chan = freechans.hd){ chan = freechans.hd;
srv = chan->pdata; if(chan == nil){
freechans.hd = srv->chan;
unlock(&freechans); unlock(&freechans);
} else { return nil;
unlock(&freechans);
chan = fs_chaninit(1, sizeof(*srv));
srv = chan->pdata;
} }
chan->reply = srvoq; srv = chan->pdata;
if(chan->send == nil) freechans.hd = srv->chan;
chan->send = serveq; unlock(&freechans);
chan->protocol = nil; chan->protocol = nil;
chan->msize = 0; chan->msize = 0;
chan->whotime = 0; chan->whotime = 0;
@ -187,8 +186,16 @@ srvchan(int fd, char *name)
incref(srv); incref(srv);
srv->chan = chan; srv->chan = chan;
srv->fd = fd; srv->fd = fd;
if(chan->reply == nil){
chan->reply = newqueue(Nqueue, "srvoq");
newproc(srvo, chan, "srvo");
}
if(chan->send == nil)
chan->send = serveq;
snprint(buf, sizeof(buf), "srvi %s", name); snprint(buf, sizeof(buf), "srvi %s", name);
newproc(srvi, srv, buf); newproc(srvi, chan, buf);
return chan; return chan;
} }
@ -196,12 +203,17 @@ srvchan(int fd, char *name)
void void
srvinit(void) srvinit(void)
{ {
Chan *chan;
Srv *srv;
int i; int i;
if(srvoq != nil) if(freechans.hd != nil)
return; return;
for(i=0; i<Nchans; i++){
srvoq = newqueue(Nqueue, "srvoq"); chan = fs_chaninit(1, sizeof(Srv));
for(i=0; i<Nsrvo; i++) srv = chan->pdata;
newproc(srvo, nil, "srvo"); srv->fd = -1;
srv->chan = freechans.hd;
freechans.hd = chan;
}
} }