From a2be120ea93ae67447315da268fa336650cd5149 Mon Sep 17 00:00:00 2001 From: cinap_lenrek Date: Thu, 17 Mar 2016 17:48:19 +0100 Subject: [PATCH] abandon streaming experiment for queue like non-seekable files, it is impossible to implement an exportfs because one has to run the kernels devtab read() and write() in separate processes, and that makes it impossible to maintain 9p message order as the scheduler can come in and randomly schedule one process before another. so as soon as we have a transition from 9p -> syscalls, we'r screwed. i currently see just two possibilities: - introduce special file type like QTSEQ with strictly ordered i/o semantics - fix all fileservers and exportfs to only do one outstanding i/o to QTSEQ files which means maintaining a queue per fid this doesnt propagate. so exporting slow 9p mount again will be limited again by latency of the inner mount. other option: - return offset in Rread, so client can bring responses back into order. this requires changing all fileservers and drivers to maintain such an per fid offset and change the protocol to include it in the response, and also pass it to userspace (new syscalls or pass it in TOS) this only works for read pipelining, write is still screwed. both options suck. -- cinap --- sys/src/9/port/devmnt.c | 14 - sys/src/9/port/devstream.c | 580 ------------------------------------- sys/src/9/port/portdat.h | 3 - sys/src/cmd/cp.c | 13 - 4 files changed, 610 deletions(-) delete mode 100644 sys/src/9/port/devstream.c diff --git a/sys/src/9/port/devmnt.c b/sys/src/9/port/devmnt.c index 52f093a1a..e6bbd02dc 100644 --- a/sys/src/9/port/devmnt.c +++ b/sys/src/9/port/devmnt.c @@ -30,8 +30,6 @@ struct Mntrpc uint rpclen; /* len of buffer */ Block* b; /* reply blocks */ Mntrpc* flushed; /* message this one flushes */ - void *iocomarg; /* Rpc completion callback for pipelining */ - void (*iocomfun)(void*, int); char done; /* Rpc completed */ }; @@ -1017,9 +1015,6 @@ mountio(Mnt *m, Mntrpc *r) lock(m); r->z = &up->sleep; r->m = m; - r->iocomarg = up->iocomarg; - r->iocomfun = up->iocomfun; - up->iocomfun = nil; r->list = m->queue; m->queue = r; unlock(m); @@ -1044,10 +1039,6 @@ mountio(Mnt *m, Mntrpc *r) if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n) error(Emountrpc); - /* Rpc commited */ - if(r->iocomfun != nil) - (*r->iocomfun)(r->iocomarg, 0); - /* Gate readers onto the mount point one at a time */ for(;;) { lock(m); @@ -1190,11 +1181,6 @@ mountmux(Mnt *m, Mntrpc *r) /* look for a reply to a message */ if(q->request.tag == r->reply.tag) { *l = q->list; - - /* Rpc completed */ - if(q->iocomfun != nil) - (*q->iocomfun)(q->iocomarg, 1); - if(q == r) { q->done = 1; unlock(m); diff --git a/sys/src/9/port/devstream.c b/sys/src/9/port/devstream.c deleted file mode 100644 index 8c150633e..000000000 --- a/sys/src/9/port/devstream.c +++ /dev/null @@ -1,580 +0,0 @@ -#include "u.h" -#include "../port/lib.h" -#include "mem.h" -#include "dat.h" -#include "fns.h" -#include "../port/error.h" - -typedef struct Stream Stream; -typedef struct Iocom Iocom; - -struct Stream -{ - Ref; - Lock; - - int iounit; - int noseek; - - Ref nrp; - Ref nwp; - Ref nwq; - - Proc *rp[4]; - Proc *wp[2]; - - Block *rlist; - - vlong soff; - vlong roff; - vlong woff; - - QLock rcl; - QLock wcl; - QLock rql; - QLock wql; - - Rendez wz; - - Queue *rq; - Queue *wq; - Chan *f; -}; - -struct Iocom -{ - Proc *p; - QLock *q; - Stream *s; - Block *b; -}; - -static void -putstream(Stream *s) -{ - if(decref(s)) - return; - freeblist(s->rlist); - qfree(s->rq); - qfree(s->wq); - if(s->f != nil) - cclose(s->f); - free(s); -} - -#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong))) -#define BDONE (1<<15) -#define BERROR (1<<14) - -static Block* -sblock(Stream *s) -{ - Block *b; - - b = allocb(sizeof(vlong)+s->iounit); - b->flag &= ~(BDONE|BERROR); - b->wp += sizeof(vlong); - b->rp = b->wp; - return b; -} - -static void -iocom(void *arg, int complete) -{ - Iocom *io = arg; - Stream *s; - QLock *q; - Proc *p; - - p = io->p; - if(complete && p == up){ - up->iocomfun = nil; - up->iocomarg = nil; - } - - q = io->q; - if(q != nil && p == up){ - io->q = nil; - qunlock(q); - } - - s = io->s; - if(complete && s != nil && s->noseek){ - io->s = nil; - lock(s); - BOFF(io->b) = s->soff; - s->soff += s->iounit; - unlock(s); - } -} - -static void -ioq(Iocom *io, QLock *q) -{ - eqlock(q); /* unlocked in iocom() above */ - - io->p = up; - io->q = q; - io->s = nil; - io->b = nil; - - up->iocomarg = io; - up->iocomfun = iocom; -} - -static void -streamreader(void *arg) -{ - Stream *s = arg; - Iocom io; - Chan *f; - Block *b, *l, **ll; - vlong o; - int id, n; - - id = incref(&s->nrp) % nelem(s->rp); - s->rp[id] = up; - - f = s->f; - b = sblock(s); - qlock(&s->rql); - if(waserror()){ - qhangup(s->rq, up->errstr); - goto Done; - } - if(s->noseek == -1){ - BOFF(b) = 0; - n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL); - - if(n > 0){ - b->wp += n; - b->flag |= BDONE; - b->next = nil; - s->rlist = b; - s->soff = s->iounit; - s->roff = 0; - s->noseek = 1; - - b = sblock(s); - } else { - s->noseek = 0; - } - } - while(!qisclosed(s->rq)) { - ll = &s->rlist; - while((l = *ll) != nil){ - if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){ - if(s->noseek){ - ll = &l->next; - continue; - } - break; - } - if((l->flag & BERROR) != 0) - error((char*)l->rp); - if(BLEN(l) == 0){ - qhangup(s->rq, nil); - poperror(); - goto Done; - } - s->roff += s->noseek ? s->iounit : BLEN(l); - *ll = l->next; - l->next = nil; - qbwrite(s->rq, l); - } - - n = s->iounit; - o = s->roff; - l = s->rlist; - if(s->noseek) { - o = 0; - b->next = l; - s->rlist = b; - } else if(l == nil) { - b->next = nil; - s->rlist = b; - } else { - if(o < BOFF(l)){ - n = BOFF(l) - o; - b->next = l; - s->rlist = b; - } else { - for(;; l = l->next){ - if((l->flag & BDONE) != 0 && BLEN(l) == 0) - goto Done; - o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l)); - if(l->next == nil) - break; - if(o < BOFF(l->next)){ - n = BOFF(l->next) - o; - break; - } - } - b->next = l->next; - l->next = b; - } - } - BOFF(b) = o; - qunlock(&s->rql); - - if(waserror()){ - poperror(); - goto Exit; - } - ioq(&io, &s->rcl); - io.b = b; - io.s = s; - if(waserror()){ - strncpy((char*)b->wp, up->errstr, s->iounit-1); - b->wp[s->iounit-1] = 0; - n = -1; - } else { - n = devtab[f->type]->read(f, b->wp, n, o); - if(n < 0) - error(Eio); - poperror(); - } - iocom(&io, 1); - poperror(); - - l = b; - b = sblock(s); - qlock(&s->rql); - if(n >= 0) - l->wp += n; - else - l->flag |= BERROR; - l->flag |= BDONE; - } - poperror(); -Done: - qunlock(&s->rql); - freeb(b); -Exit: - s->rp[id] = nil; - putstream(s); - pexit("closed", 1); -} - -static void -streamwriter(void *arg) -{ - Stream *s = arg; - Iocom io; - Block *b; - Chan *f; - vlong o; - int id, n; - - id = incref(&s->nwp) % nelem(s->wp); - s->wp[id] = up; - - f = s->f; - while(!qisclosed(s->wq)) { - if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0) - wakeup(&s->wz); /* queue drained */ - if(waserror()){ - decref(&s->nwq); - break; - } - ioq(&io, &s->wcl); - b = qbread(s->wq, s->iounit); - decref(&s->nwq); - if(b == nil){ - iocom(&io, 1); - break; - } - poperror(); - - if(waserror()){ - qhangup(s->wq, up->errstr); - iocom(&io, 1); - freeb(b); - break; - } - n = BLEN(b); - o = s->woff; - s->woff += n; - if(devtab[f->type]->write(f, b->rp, n, o) != n) - error(Eio); - iocom(&io, 1); - freeb(b); - poperror(); - } - - s->wp[id] = nil; - wakeup(&s->wz); - - putstream(s); - pexit("closed", 1); -} - -static int -streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp) -{ - static int perm[] = { 0400, 0200, 0600, 0 }; - Fgrp *fgrp = up->fgrp; - Chan *f; - Qid q; - - if(s == DEVDOTDOT){ - devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp); - return 1; - } - if(s == 0) - return 0; - s--; - if(s > fgrp->maxfd) - return -1; - if((f=fgrp->fd[s]) == nil) - return 0; - sprint(up->genbuf, "%dstream", s); - mkqid(&q, s+1, 0, QTFILE); - devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp); - return 1; -} - -static Chan* -streamattach(char *spec) -{ - return devattach(L'¶', spec); -} - -static Walkqid* -streamwalk(Chan *c, Chan *nc, char **name, int nname) -{ - return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen); -} - -static int -streamstat(Chan *c, uchar *db, int n) -{ - return devstat(c, db, n, (Dirtab *)0, 0L, streamgen); -} - -static Chan* -streamopen(Chan *c, int omode) -{ - Stream *s; - - c->aux = nil; - if(c->qid.type & QTDIR){ - if(omode != 0) - error(Eisdir); - c->mode = 0; - c->flag |= COPEN; - c->offset = 0; - return c; - } - s = mallocz(sizeof(*s), 1); - if(s == nil) - error(Enomem); - incref(s); - if(waserror()){ - putstream(s); - nexterror(); - } - omode = openmode(omode); - s->f = fdtochan(c->qid.path - 1, omode, 0, 1); - if(s->f == nil || s->f->qid.type != QTFILE) - error(Eperm); - s->noseek = -1; - s->roff = s->f->offset; - s->woff = s->f->offset; - s->iounit = s->f->iounit; - if(s->iounit <= 0 || s->iounit > qiomaxatomic) - s->iounit = qiomaxatomic; - c->iounit = s->iounit; - c->aux = s; - c->mode = omode; - c->flag |= COPEN; - c->offset = 0; - poperror(); - return c; -} - -static int -isdrained(void *a) -{ - Stream *s; - int i; - - s = a; - if(s->wq == nil) - return 1; - - if(qisclosed(s->wq) == 0) - return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref; - - for(i=0; iwp); i++) - if(s->wp[i] != nil) - return 0; - - return 1; -} - -static void -streamdrain(Chan *c) -{ - Stream *s; - - if((s = c->aux) == nil) - return; - eqlock(&s->wql); - if(waserror()){ - qunlock(&s->wql); - nexterror(); - } - while(!isdrained(s)) - sleep(&s->wz, isdrained, s); - qunlock(&s->wql); - poperror(); -} - -static void -streamclose(Chan *c) -{ - Stream *s; - int i; - - if((c->flag & COPEN) == 0 || (s = c->aux) == nil) - return; - if(s->rq != nil){ - qclose(s->rq); - for(i=0; irp); i++) - postnote(s->rp[i], 1, "streamclose", 0); - } - if(s->wq != nil){ - qhangup(s->wq, nil); - if(!waserror()){ - streamdrain(c); - poperror(); - } - qclose(s->wq); /* discard the data */ - for(i=0; iwp); i++) - postnote(s->wp[i], 1, "streamclose", 0); - } - c->aux = nil; - putstream(s); -} - -static int -canpipeline(Chan *f, int mode) -{ - USED(mode); - - return devtab[f->type]->dc == 'M'; -} - -static Queue* -streamqueue(Chan *c, int mode) -{ - Stream *s; - int i, n; - - s = c->aux; - if(s == nil || c->qid.type != QTFILE) - error(Eperm); - - switch(mode){ - case OREAD: - while(s->rq == nil){ - qlock(&s->rql); - if(s->rq != nil){ - qunlock(&s->rql); - break; - } - s->rq = qopen(conf.pipeqsize, 0, 0, 0); - if(s->rq == nil){ - qunlock(&s->rql); - error(Enomem); - } - n = canpipeline(s->f, mode) ? nelem(s->rp) : 1; - for(i=0; inrp.ref != n) - sched(); - qunlock(&s->rql); - break; - } - return s->rq; - case OWRITE: - while(s->wq == nil){ - qlock(&s->wql); - if(s->wq != nil){ - qunlock(&s->wql); - break; - } - s->wq = qopen(conf.pipeqsize, 0, 0, 0); - if(s->wq == nil){ - qunlock(&s->wql); - error(Enomem); - } - n = canpipeline(s->f, mode) ? nelem(s->wp) : 1; - for(i=0; inwp.ref != n) - sched(); - qunlock(&s->wql); - break; - } - return s->wq; - } - error(Egreg); - return nil; -} - -static long -streamread(Chan *c, void *va, long n, vlong) -{ - if(c->qid.type == QTDIR) - return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen); - return qread(streamqueue(c, OREAD), va, n); -} - -static Block* -streambread(Chan *c, long n, ulong) -{ - return qbread(streamqueue(c, OREAD), n); -} - -static long -streamwrite(Chan *c, void *va, long n, vlong) -{ - if(n == 0) - streamdrain(c); - return qwrite(streamqueue(c, OWRITE), va, n); -} - -static long -streambwrite(Chan *c, Block *b, ulong) -{ - if(BLEN(b) == 0) - streamdrain(c); - return qbwrite(streamqueue(c, OWRITE), b); -} - -Dev streamdevtab = { - L'¶', - "stream", - - devreset, - devinit, - devshutdown, - streamattach, - streamwalk, - streamstat, - streamopen, - devcreate, - streamclose, - streamread, - streambread, - streamwrite, - streambwrite, - devremove, - devwstat, -}; diff --git a/sys/src/9/port/portdat.h b/sys/src/9/port/portdat.h index 4c55cbcfb..ae047da66 100644 --- a/sys/src/9/port/portdat.h +++ b/sys/src/9/port/portdat.h @@ -777,9 +777,6 @@ struct Proc PMMU; char *syscalltrace; /* syscall trace */ - - void *iocomarg; /* I/O completion callback for pipelining */ - void (*iocomfun)(void*, int); }; enum diff --git a/sys/src/cmd/cp.c b/sys/src/cmd/cp.c index 88eaead4f..341d32c14 100644 --- a/sys/src/cmd/cp.c +++ b/sys/src/cmd/cp.c @@ -127,19 +127,6 @@ copy(char *from, char *to, int todir) if(buflen <= 0) buflen = DEFB; - if(dirb->length/2 > buflen){ - char nam[32]; - - snprint(nam, sizeof nam, "/fd/%dstream", fdf); - fds = open(nam, OREAD); - if(fds >= 0){ - close(fdf); - fdf = fds; - } - snprint(nam, sizeof nam, "/fd/%dstream", fdt); - fds = open(nam, OWRITE); - } - if(copy1(fdf, fds < 0 ? fdt : fds, from, to)==0){ if(fds >= 0 && write(fds, "", 0) < 0){ fprint(2, "cp: error writing %s: %r\n", to);