abandon streaming experiment

for queue like non-seekable files, it is impossible to implement an
exportfs because one has to run the kernels devtab read() and write()
in separate processes, and that makes it impossible to maintain 9p message
order as the scheduler can come in and randomly schedule one process before
another.

so as soon as we have a transition from 9p -> syscalls, we'r screwed.

i currently see just two possibilities:

- introduce special file type like QTSEQ with strictly ordered i/o semantics
- fix all fileservers and exportfs to only do one outstanding i/o to QTSEQ files
which means maintaining a queue per fid

this doesnt propagate. so exporting slow 9p mount again will be limited
again by latency of the inner mount.

other option:

- return offset in Rread, so client can bring responses back into order. this
requires changing all fileservers and drivers to maintain such an per fid offset
and change the protocol to include it in the response, and also pass it to userspace
(new syscalls or pass it in TOS)

this only works for read pipelining, write is still screwed.

both options suck.

--
cinap
This commit is contained in:
cinap_lenrek 2016-03-17 17:48:19 +01:00
parent 5aaa7240a2
commit a2be120ea9
4 changed files with 0 additions and 610 deletions

View file

@ -30,8 +30,6 @@ struct Mntrpc
uint rpclen; /* len of buffer */
Block* b; /* reply blocks */
Mntrpc* flushed; /* message this one flushes */
void *iocomarg; /* Rpc completion callback for pipelining */
void (*iocomfun)(void*, int);
char done; /* Rpc completed */
};
@ -1017,9 +1015,6 @@ mountio(Mnt *m, Mntrpc *r)
lock(m);
r->z = &up->sleep;
r->m = m;
r->iocomarg = up->iocomarg;
r->iocomfun = up->iocomfun;
up->iocomfun = nil;
r->list = m->queue;
m->queue = r;
unlock(m);
@ -1044,10 +1039,6 @@ mountio(Mnt *m, Mntrpc *r)
if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n)
error(Emountrpc);
/* Rpc commited */
if(r->iocomfun != nil)
(*r->iocomfun)(r->iocomarg, 0);
/* Gate readers onto the mount point one at a time */
for(;;) {
lock(m);
@ -1190,11 +1181,6 @@ mountmux(Mnt *m, Mntrpc *r)
/* look for a reply to a message */
if(q->request.tag == r->reply.tag) {
*l = q->list;
/* Rpc completed */
if(q->iocomfun != nil)
(*q->iocomfun)(q->iocomarg, 1);
if(q == r) {
q->done = 1;
unlock(m);

View file

@ -1,580 +0,0 @@
#include "u.h"
#include "../port/lib.h"
#include "mem.h"
#include "dat.h"
#include "fns.h"
#include "../port/error.h"
typedef struct Stream Stream;
typedef struct Iocom Iocom;
struct Stream
{
Ref;
Lock;
int iounit;
int noseek;
Ref nrp;
Ref nwp;
Ref nwq;
Proc *rp[4];
Proc *wp[2];
Block *rlist;
vlong soff;
vlong roff;
vlong woff;
QLock rcl;
QLock wcl;
QLock rql;
QLock wql;
Rendez wz;
Queue *rq;
Queue *wq;
Chan *f;
};
struct Iocom
{
Proc *p;
QLock *q;
Stream *s;
Block *b;
};
static void
putstream(Stream *s)
{
if(decref(s))
return;
freeblist(s->rlist);
qfree(s->rq);
qfree(s->wq);
if(s->f != nil)
cclose(s->f);
free(s);
}
#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong)))
#define BDONE (1<<15)
#define BERROR (1<<14)
static Block*
sblock(Stream *s)
{
Block *b;
b = allocb(sizeof(vlong)+s->iounit);
b->flag &= ~(BDONE|BERROR);
b->wp += sizeof(vlong);
b->rp = b->wp;
return b;
}
static void
iocom(void *arg, int complete)
{
Iocom *io = arg;
Stream *s;
QLock *q;
Proc *p;
p = io->p;
if(complete && p == up){
up->iocomfun = nil;
up->iocomarg = nil;
}
q = io->q;
if(q != nil && p == up){
io->q = nil;
qunlock(q);
}
s = io->s;
if(complete && s != nil && s->noseek){
io->s = nil;
lock(s);
BOFF(io->b) = s->soff;
s->soff += s->iounit;
unlock(s);
}
}
static void
ioq(Iocom *io, QLock *q)
{
eqlock(q); /* unlocked in iocom() above */
io->p = up;
io->q = q;
io->s = nil;
io->b = nil;
up->iocomarg = io;
up->iocomfun = iocom;
}
static void
streamreader(void *arg)
{
Stream *s = arg;
Iocom io;
Chan *f;
Block *b, *l, **ll;
vlong o;
int id, n;
id = incref(&s->nrp) % nelem(s->rp);
s->rp[id] = up;
f = s->f;
b = sblock(s);
qlock(&s->rql);
if(waserror()){
qhangup(s->rq, up->errstr);
goto Done;
}
if(s->noseek == -1){
BOFF(b) = 0;
n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL);
if(n > 0){
b->wp += n;
b->flag |= BDONE;
b->next = nil;
s->rlist = b;
s->soff = s->iounit;
s->roff = 0;
s->noseek = 1;
b = sblock(s);
} else {
s->noseek = 0;
}
}
while(!qisclosed(s->rq)) {
ll = &s->rlist;
while((l = *ll) != nil){
if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){
if(s->noseek){
ll = &l->next;
continue;
}
break;
}
if((l->flag & BERROR) != 0)
error((char*)l->rp);
if(BLEN(l) == 0){
qhangup(s->rq, nil);
poperror();
goto Done;
}
s->roff += s->noseek ? s->iounit : BLEN(l);
*ll = l->next;
l->next = nil;
qbwrite(s->rq, l);
}
n = s->iounit;
o = s->roff;
l = s->rlist;
if(s->noseek) {
o = 0;
b->next = l;
s->rlist = b;
} else if(l == nil) {
b->next = nil;
s->rlist = b;
} else {
if(o < BOFF(l)){
n = BOFF(l) - o;
b->next = l;
s->rlist = b;
} else {
for(;; l = l->next){
if((l->flag & BDONE) != 0 && BLEN(l) == 0)
goto Done;
o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l));
if(l->next == nil)
break;
if(o < BOFF(l->next)){
n = BOFF(l->next) - o;
break;
}
}
b->next = l->next;
l->next = b;
}
}
BOFF(b) = o;
qunlock(&s->rql);
if(waserror()){
poperror();
goto Exit;
}
ioq(&io, &s->rcl);
io.b = b;
io.s = s;
if(waserror()){
strncpy((char*)b->wp, up->errstr, s->iounit-1);
b->wp[s->iounit-1] = 0;
n = -1;
} else {
n = devtab[f->type]->read(f, b->wp, n, o);
if(n < 0)
error(Eio);
poperror();
}
iocom(&io, 1);
poperror();
l = b;
b = sblock(s);
qlock(&s->rql);
if(n >= 0)
l->wp += n;
else
l->flag |= BERROR;
l->flag |= BDONE;
}
poperror();
Done:
qunlock(&s->rql);
freeb(b);
Exit:
s->rp[id] = nil;
putstream(s);
pexit("closed", 1);
}
static void
streamwriter(void *arg)
{
Stream *s = arg;
Iocom io;
Block *b;
Chan *f;
vlong o;
int id, n;
id = incref(&s->nwp) % nelem(s->wp);
s->wp[id] = up;
f = s->f;
while(!qisclosed(s->wq)) {
if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0)
wakeup(&s->wz); /* queue drained */
if(waserror()){
decref(&s->nwq);
break;
}
ioq(&io, &s->wcl);
b = qbread(s->wq, s->iounit);
decref(&s->nwq);
if(b == nil){
iocom(&io, 1);
break;
}
poperror();
if(waserror()){
qhangup(s->wq, up->errstr);
iocom(&io, 1);
freeb(b);
break;
}
n = BLEN(b);
o = s->woff;
s->woff += n;
if(devtab[f->type]->write(f, b->rp, n, o) != n)
error(Eio);
iocom(&io, 1);
freeb(b);
poperror();
}
s->wp[id] = nil;
wakeup(&s->wz);
putstream(s);
pexit("closed", 1);
}
static int
streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp)
{
static int perm[] = { 0400, 0200, 0600, 0 };
Fgrp *fgrp = up->fgrp;
Chan *f;
Qid q;
if(s == DEVDOTDOT){
devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp);
return 1;
}
if(s == 0)
return 0;
s--;
if(s > fgrp->maxfd)
return -1;
if((f=fgrp->fd[s]) == nil)
return 0;
sprint(up->genbuf, "%dstream", s);
mkqid(&q, s+1, 0, QTFILE);
devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp);
return 1;
}
static Chan*
streamattach(char *spec)
{
return devattach(L'', spec);
}
static Walkqid*
streamwalk(Chan *c, Chan *nc, char **name, int nname)
{
return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen);
}
static int
streamstat(Chan *c, uchar *db, int n)
{
return devstat(c, db, n, (Dirtab *)0, 0L, streamgen);
}
static Chan*
streamopen(Chan *c, int omode)
{
Stream *s;
c->aux = nil;
if(c->qid.type & QTDIR){
if(omode != 0)
error(Eisdir);
c->mode = 0;
c->flag |= COPEN;
c->offset = 0;
return c;
}
s = mallocz(sizeof(*s), 1);
if(s == nil)
error(Enomem);
incref(s);
if(waserror()){
putstream(s);
nexterror();
}
omode = openmode(omode);
s->f = fdtochan(c->qid.path - 1, omode, 0, 1);
if(s->f == nil || s->f->qid.type != QTFILE)
error(Eperm);
s->noseek = -1;
s->roff = s->f->offset;
s->woff = s->f->offset;
s->iounit = s->f->iounit;
if(s->iounit <= 0 || s->iounit > qiomaxatomic)
s->iounit = qiomaxatomic;
c->iounit = s->iounit;
c->aux = s;
c->mode = omode;
c->flag |= COPEN;
c->offset = 0;
poperror();
return c;
}
static int
isdrained(void *a)
{
Stream *s;
int i;
s = a;
if(s->wq == nil)
return 1;
if(qisclosed(s->wq) == 0)
return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref;
for(i=0; i<nelem(s->wp); i++)
if(s->wp[i] != nil)
return 0;
return 1;
}
static void
streamdrain(Chan *c)
{
Stream *s;
if((s = c->aux) == nil)
return;
eqlock(&s->wql);
if(waserror()){
qunlock(&s->wql);
nexterror();
}
while(!isdrained(s))
sleep(&s->wz, isdrained, s);
qunlock(&s->wql);
poperror();
}
static void
streamclose(Chan *c)
{
Stream *s;
int i;
if((c->flag & COPEN) == 0 || (s = c->aux) == nil)
return;
if(s->rq != nil){
qclose(s->rq);
for(i=0; i<nelem(s->rp); i++)
postnote(s->rp[i], 1, "streamclose", 0);
}
if(s->wq != nil){
qhangup(s->wq, nil);
if(!waserror()){
streamdrain(c);
poperror();
}
qclose(s->wq); /* discard the data */
for(i=0; i<nelem(s->wp); i++)
postnote(s->wp[i], 1, "streamclose", 0);
}
c->aux = nil;
putstream(s);
}
static int
canpipeline(Chan *f, int mode)
{
USED(mode);
return devtab[f->type]->dc == 'M';
}
static Queue*
streamqueue(Chan *c, int mode)
{
Stream *s;
int i, n;
s = c->aux;
if(s == nil || c->qid.type != QTFILE)
error(Eperm);
switch(mode){
case OREAD:
while(s->rq == nil){
qlock(&s->rql);
if(s->rq != nil){
qunlock(&s->rql);
break;
}
s->rq = qopen(conf.pipeqsize, 0, 0, 0);
if(s->rq == nil){
qunlock(&s->rql);
error(Enomem);
}
n = canpipeline(s->f, mode) ? nelem(s->rp) : 1;
for(i=0; i<n; i++){
incref(s);
kproc("streamreader", streamreader, s);
}
while(s->nrp.ref != n)
sched();
qunlock(&s->rql);
break;
}
return s->rq;
case OWRITE:
while(s->wq == nil){
qlock(&s->wql);
if(s->wq != nil){
qunlock(&s->wql);
break;
}
s->wq = qopen(conf.pipeqsize, 0, 0, 0);
if(s->wq == nil){
qunlock(&s->wql);
error(Enomem);
}
n = canpipeline(s->f, mode) ? nelem(s->wp) : 1;
for(i=0; i<n; i++){
incref(s);
kproc("streamwriter", streamwriter, s);
}
while(s->nwp.ref != n)
sched();
qunlock(&s->wql);
break;
}
return s->wq;
}
error(Egreg);
return nil;
}
static long
streamread(Chan *c, void *va, long n, vlong)
{
if(c->qid.type == QTDIR)
return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen);
return qread(streamqueue(c, OREAD), va, n);
}
static Block*
streambread(Chan *c, long n, ulong)
{
return qbread(streamqueue(c, OREAD), n);
}
static long
streamwrite(Chan *c, void *va, long n, vlong)
{
if(n == 0)
streamdrain(c);
return qwrite(streamqueue(c, OWRITE), va, n);
}
static long
streambwrite(Chan *c, Block *b, ulong)
{
if(BLEN(b) == 0)
streamdrain(c);
return qbwrite(streamqueue(c, OWRITE), b);
}
Dev streamdevtab = {
L'',
"stream",
devreset,
devinit,
devshutdown,
streamattach,
streamwalk,
streamstat,
streamopen,
devcreate,
streamclose,
streamread,
streambread,
streamwrite,
streambwrite,
devremove,
devwstat,
};

View file

@ -777,9 +777,6 @@ struct Proc
PMMU;
char *syscalltrace; /* syscall trace */
void *iocomarg; /* I/O completion callback for pipelining */
void (*iocomfun)(void*, int);
};
enum

View file

@ -127,19 +127,6 @@ copy(char *from, char *to, int todir)
if(buflen <= 0)
buflen = DEFB;
if(dirb->length/2 > buflen){
char nam[32];
snprint(nam, sizeof nam, "/fd/%dstream", fdf);
fds = open(nam, OREAD);
if(fds >= 0){
close(fdf);
fdf = fds;
}
snprint(nam, sizeof nam, "/fd/%dstream", fdt);
fds = open(nam, OWRITE);
}
if(copy1(fdf, fds < 0 ? fdt : fds, from, to)==0){
if(fds >= 0 && write(fds, "", 0) < 0){
fprint(2, "cp: error writing %s: %r\n", to);