diff --git a/sys/src/9/port/devmnt.c b/sys/src/9/port/devmnt.c index 52f093a1a..e6bbd02dc 100644 --- a/sys/src/9/port/devmnt.c +++ b/sys/src/9/port/devmnt.c @@ -30,8 +30,6 @@ struct Mntrpc uint rpclen; /* len of buffer */ Block* b; /* reply blocks */ Mntrpc* flushed; /* message this one flushes */ - void *iocomarg; /* Rpc completion callback for pipelining */ - void (*iocomfun)(void*, int); char done; /* Rpc completed */ }; @@ -1017,9 +1015,6 @@ mountio(Mnt *m, Mntrpc *r) lock(m); r->z = &up->sleep; r->m = m; - r->iocomarg = up->iocomarg; - r->iocomfun = up->iocomfun; - up->iocomfun = nil; r->list = m->queue; m->queue = r; unlock(m); @@ -1044,10 +1039,6 @@ mountio(Mnt *m, Mntrpc *r) if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n) error(Emountrpc); - /* Rpc commited */ - if(r->iocomfun != nil) - (*r->iocomfun)(r->iocomarg, 0); - /* Gate readers onto the mount point one at a time */ for(;;) { lock(m); @@ -1190,11 +1181,6 @@ mountmux(Mnt *m, Mntrpc *r) /* look for a reply to a message */ if(q->request.tag == r->reply.tag) { *l = q->list; - - /* Rpc completed */ - if(q->iocomfun != nil) - (*q->iocomfun)(q->iocomarg, 1); - if(q == r) { q->done = 1; unlock(m); diff --git a/sys/src/9/port/devstream.c b/sys/src/9/port/devstream.c deleted file mode 100644 index 8c150633e..000000000 --- a/sys/src/9/port/devstream.c +++ /dev/null @@ -1,580 +0,0 @@ -#include "u.h" -#include "../port/lib.h" -#include "mem.h" -#include "dat.h" -#include "fns.h" -#include "../port/error.h" - -typedef struct Stream Stream; -typedef struct Iocom Iocom; - -struct Stream -{ - Ref; - Lock; - - int iounit; - int noseek; - - Ref nrp; - Ref nwp; - Ref nwq; - - Proc *rp[4]; - Proc *wp[2]; - - Block *rlist; - - vlong soff; - vlong roff; - vlong woff; - - QLock rcl; - QLock wcl; - QLock rql; - QLock wql; - - Rendez wz; - - Queue *rq; - Queue *wq; - Chan *f; -}; - -struct Iocom -{ - Proc *p; - QLock *q; - Stream *s; - Block *b; -}; - -static void -putstream(Stream *s) -{ - if(decref(s)) - return; - freeblist(s->rlist); - qfree(s->rq); - qfree(s->wq); - if(s->f != nil) - cclose(s->f); - free(s); -} - -#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong))) -#define BDONE (1<<15) -#define BERROR (1<<14) - -static Block* -sblock(Stream *s) -{ - Block *b; - - b = allocb(sizeof(vlong)+s->iounit); - b->flag &= ~(BDONE|BERROR); - b->wp += sizeof(vlong); - b->rp = b->wp; - return b; -} - -static void -iocom(void *arg, int complete) -{ - Iocom *io = arg; - Stream *s; - QLock *q; - Proc *p; - - p = io->p; - if(complete && p == up){ - up->iocomfun = nil; - up->iocomarg = nil; - } - - q = io->q; - if(q != nil && p == up){ - io->q = nil; - qunlock(q); - } - - s = io->s; - if(complete && s != nil && s->noseek){ - io->s = nil; - lock(s); - BOFF(io->b) = s->soff; - s->soff += s->iounit; - unlock(s); - } -} - -static void -ioq(Iocom *io, QLock *q) -{ - eqlock(q); /* unlocked in iocom() above */ - - io->p = up; - io->q = q; - io->s = nil; - io->b = nil; - - up->iocomarg = io; - up->iocomfun = iocom; -} - -static void -streamreader(void *arg) -{ - Stream *s = arg; - Iocom io; - Chan *f; - Block *b, *l, **ll; - vlong o; - int id, n; - - id = incref(&s->nrp) % nelem(s->rp); - s->rp[id] = up; - - f = s->f; - b = sblock(s); - qlock(&s->rql); - if(waserror()){ - qhangup(s->rq, up->errstr); - goto Done; - } - if(s->noseek == -1){ - BOFF(b) = 0; - n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL); - - if(n > 0){ - b->wp += n; - b->flag |= BDONE; - b->next = nil; - s->rlist = b; - s->soff = s->iounit; - s->roff = 0; - s->noseek = 1; - - b = sblock(s); - } else { - s->noseek = 0; - } - } - while(!qisclosed(s->rq)) { - ll = &s->rlist; - while((l = *ll) != nil){ - if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){ - if(s->noseek){ - ll = &l->next; - continue; - } - break; - } - if((l->flag & BERROR) != 0) - error((char*)l->rp); - if(BLEN(l) == 0){ - qhangup(s->rq, nil); - poperror(); - goto Done; - } - s->roff += s->noseek ? s->iounit : BLEN(l); - *ll = l->next; - l->next = nil; - qbwrite(s->rq, l); - } - - n = s->iounit; - o = s->roff; - l = s->rlist; - if(s->noseek) { - o = 0; - b->next = l; - s->rlist = b; - } else if(l == nil) { - b->next = nil; - s->rlist = b; - } else { - if(o < BOFF(l)){ - n = BOFF(l) - o; - b->next = l; - s->rlist = b; - } else { - for(;; l = l->next){ - if((l->flag & BDONE) != 0 && BLEN(l) == 0) - goto Done; - o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l)); - if(l->next == nil) - break; - if(o < BOFF(l->next)){ - n = BOFF(l->next) - o; - break; - } - } - b->next = l->next; - l->next = b; - } - } - BOFF(b) = o; - qunlock(&s->rql); - - if(waserror()){ - poperror(); - goto Exit; - } - ioq(&io, &s->rcl); - io.b = b; - io.s = s; - if(waserror()){ - strncpy((char*)b->wp, up->errstr, s->iounit-1); - b->wp[s->iounit-1] = 0; - n = -1; - } else { - n = devtab[f->type]->read(f, b->wp, n, o); - if(n < 0) - error(Eio); - poperror(); - } - iocom(&io, 1); - poperror(); - - l = b; - b = sblock(s); - qlock(&s->rql); - if(n >= 0) - l->wp += n; - else - l->flag |= BERROR; - l->flag |= BDONE; - } - poperror(); -Done: - qunlock(&s->rql); - freeb(b); -Exit: - s->rp[id] = nil; - putstream(s); - pexit("closed", 1); -} - -static void -streamwriter(void *arg) -{ - Stream *s = arg; - Iocom io; - Block *b; - Chan *f; - vlong o; - int id, n; - - id = incref(&s->nwp) % nelem(s->wp); - s->wp[id] = up; - - f = s->f; - while(!qisclosed(s->wq)) { - if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0) - wakeup(&s->wz); /* queue drained */ - if(waserror()){ - decref(&s->nwq); - break; - } - ioq(&io, &s->wcl); - b = qbread(s->wq, s->iounit); - decref(&s->nwq); - if(b == nil){ - iocom(&io, 1); - break; - } - poperror(); - - if(waserror()){ - qhangup(s->wq, up->errstr); - iocom(&io, 1); - freeb(b); - break; - } - n = BLEN(b); - o = s->woff; - s->woff += n; - if(devtab[f->type]->write(f, b->rp, n, o) != n) - error(Eio); - iocom(&io, 1); - freeb(b); - poperror(); - } - - s->wp[id] = nil; - wakeup(&s->wz); - - putstream(s); - pexit("closed", 1); -} - -static int -streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp) -{ - static int perm[] = { 0400, 0200, 0600, 0 }; - Fgrp *fgrp = up->fgrp; - Chan *f; - Qid q; - - if(s == DEVDOTDOT){ - devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp); - return 1; - } - if(s == 0) - return 0; - s--; - if(s > fgrp->maxfd) - return -1; - if((f=fgrp->fd[s]) == nil) - return 0; - sprint(up->genbuf, "%dstream", s); - mkqid(&q, s+1, 0, QTFILE); - devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp); - return 1; -} - -static Chan* -streamattach(char *spec) -{ - return devattach(L'¶', spec); -} - -static Walkqid* -streamwalk(Chan *c, Chan *nc, char **name, int nname) -{ - return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen); -} - -static int -streamstat(Chan *c, uchar *db, int n) -{ - return devstat(c, db, n, (Dirtab *)0, 0L, streamgen); -} - -static Chan* -streamopen(Chan *c, int omode) -{ - Stream *s; - - c->aux = nil; - if(c->qid.type & QTDIR){ - if(omode != 0) - error(Eisdir); - c->mode = 0; - c->flag |= COPEN; - c->offset = 0; - return c; - } - s = mallocz(sizeof(*s), 1); - if(s == nil) - error(Enomem); - incref(s); - if(waserror()){ - putstream(s); - nexterror(); - } - omode = openmode(omode); - s->f = fdtochan(c->qid.path - 1, omode, 0, 1); - if(s->f == nil || s->f->qid.type != QTFILE) - error(Eperm); - s->noseek = -1; - s->roff = s->f->offset; - s->woff = s->f->offset; - s->iounit = s->f->iounit; - if(s->iounit <= 0 || s->iounit > qiomaxatomic) - s->iounit = qiomaxatomic; - c->iounit = s->iounit; - c->aux = s; - c->mode = omode; - c->flag |= COPEN; - c->offset = 0; - poperror(); - return c; -} - -static int -isdrained(void *a) -{ - Stream *s; - int i; - - s = a; - if(s->wq == nil) - return 1; - - if(qisclosed(s->wq) == 0) - return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref; - - for(i=0; iwp); i++) - if(s->wp[i] != nil) - return 0; - - return 1; -} - -static void -streamdrain(Chan *c) -{ - Stream *s; - - if((s = c->aux) == nil) - return; - eqlock(&s->wql); - if(waserror()){ - qunlock(&s->wql); - nexterror(); - } - while(!isdrained(s)) - sleep(&s->wz, isdrained, s); - qunlock(&s->wql); - poperror(); -} - -static void -streamclose(Chan *c) -{ - Stream *s; - int i; - - if((c->flag & COPEN) == 0 || (s = c->aux) == nil) - return; - if(s->rq != nil){ - qclose(s->rq); - for(i=0; irp); i++) - postnote(s->rp[i], 1, "streamclose", 0); - } - if(s->wq != nil){ - qhangup(s->wq, nil); - if(!waserror()){ - streamdrain(c); - poperror(); - } - qclose(s->wq); /* discard the data */ - for(i=0; iwp); i++) - postnote(s->wp[i], 1, "streamclose", 0); - } - c->aux = nil; - putstream(s); -} - -static int -canpipeline(Chan *f, int mode) -{ - USED(mode); - - return devtab[f->type]->dc == 'M'; -} - -static Queue* -streamqueue(Chan *c, int mode) -{ - Stream *s; - int i, n; - - s = c->aux; - if(s == nil || c->qid.type != QTFILE) - error(Eperm); - - switch(mode){ - case OREAD: - while(s->rq == nil){ - qlock(&s->rql); - if(s->rq != nil){ - qunlock(&s->rql); - break; - } - s->rq = qopen(conf.pipeqsize, 0, 0, 0); - if(s->rq == nil){ - qunlock(&s->rql); - error(Enomem); - } - n = canpipeline(s->f, mode) ? nelem(s->rp) : 1; - for(i=0; inrp.ref != n) - sched(); - qunlock(&s->rql); - break; - } - return s->rq; - case OWRITE: - while(s->wq == nil){ - qlock(&s->wql); - if(s->wq != nil){ - qunlock(&s->wql); - break; - } - s->wq = qopen(conf.pipeqsize, 0, 0, 0); - if(s->wq == nil){ - qunlock(&s->wql); - error(Enomem); - } - n = canpipeline(s->f, mode) ? nelem(s->wp) : 1; - for(i=0; inwp.ref != n) - sched(); - qunlock(&s->wql); - break; - } - return s->wq; - } - error(Egreg); - return nil; -} - -static long -streamread(Chan *c, void *va, long n, vlong) -{ - if(c->qid.type == QTDIR) - return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen); - return qread(streamqueue(c, OREAD), va, n); -} - -static Block* -streambread(Chan *c, long n, ulong) -{ - return qbread(streamqueue(c, OREAD), n); -} - -static long -streamwrite(Chan *c, void *va, long n, vlong) -{ - if(n == 0) - streamdrain(c); - return qwrite(streamqueue(c, OWRITE), va, n); -} - -static long -streambwrite(Chan *c, Block *b, ulong) -{ - if(BLEN(b) == 0) - streamdrain(c); - return qbwrite(streamqueue(c, OWRITE), b); -} - -Dev streamdevtab = { - L'¶', - "stream", - - devreset, - devinit, - devshutdown, - streamattach, - streamwalk, - streamstat, - streamopen, - devcreate, - streamclose, - streamread, - streambread, - streamwrite, - streambwrite, - devremove, - devwstat, -}; diff --git a/sys/src/9/port/portdat.h b/sys/src/9/port/portdat.h index 4c55cbcfb..ae047da66 100644 --- a/sys/src/9/port/portdat.h +++ b/sys/src/9/port/portdat.h @@ -777,9 +777,6 @@ struct Proc PMMU; char *syscalltrace; /* syscall trace */ - - void *iocomarg; /* I/O completion callback for pipelining */ - void (*iocomfun)(void*, int); }; enum diff --git a/sys/src/cmd/cp.c b/sys/src/cmd/cp.c index 88eaead4f..341d32c14 100644 --- a/sys/src/cmd/cp.c +++ b/sys/src/cmd/cp.c @@ -127,19 +127,6 @@ copy(char *from, char *to, int todir) if(buflen <= 0) buflen = DEFB; - if(dirb->length/2 > buflen){ - char nam[32]; - - snprint(nam, sizeof nam, "/fd/%dstream", fdf); - fds = open(nam, OREAD); - if(fds >= 0){ - close(fdf); - fdf = fds; - } - snprint(nam, sizeof nam, "/fd/%dstream", fdt); - fds = open(nam, OWRITE); - } - if(copy1(fdf, fds < 0 ? fdt : fds, from, to)==0){ if(fds >= 0 && write(fds, "", 0) < 0){ fprint(2, "cp: error writing %s: %r\n", to);