diff --git a/sys/man/3/stream b/sys/man/3/stream new file mode 100644 index 000000000..fb20cb51f --- /dev/null +++ b/sys/man/3/stream @@ -0,0 +1,40 @@ +.TH STREAM 3 +.SH NAME +stream \- fast sequential file access +.SH SYNOPSIS +.nf +.B bind #¶ /fd + +.B /fd/0stream +.B /fd/1stream +\&... +.fi +.SH DESCRIPTION +The +.I stream +device serves a one-level directory containing files of the form +.IR N stream +where +.I N +is a file descriptor of the current process. +.PP +An +.IR open (2) +returns a stream file descriptor connected to the original file +refered to by the file descriptor +.IR N . +When a stream was opend for reading, the device will start +continuously reading the file in the background until it reaches +the end of the file. A +.IR read (2) +on the stream consumes the prefetched data in sequential order. +.PP +When a stream is opend for writing, writes to the stream will +return immidiately without waiting for the data to be written +to the file. A zero-length write can be used to wait for the +buffered data to drain and return any previous write errors. +.SH SEE ALSO +.IR dup (2), +.IR pipe (3) +.SH SOURCE +.B /sys/src/9/port/devstream.c diff --git a/sys/src/9/port/devmnt.c b/sys/src/9/port/devmnt.c index 6e220f70a..2b6610afc 100644 --- a/sys/src/9/port/devmnt.c +++ b/sys/src/9/port/devmnt.c @@ -30,6 +30,8 @@ struct Mntrpc uint rpclen; /* len of buffer */ Block* b; /* reply blocks */ Mntrpc* flushed; /* message this one flushes */ + void *iocomarg; /* Rpc completion callback for pipelining */ + void (*iocomfun)(void*, int); char done; /* Rpc completed */ }; @@ -789,6 +791,9 @@ mountio(Mnt *m, Mntrpc *r) lock(m); r->z = &up->sleep; r->m = m; + r->iocomarg = up->iocomarg; + r->iocomfun = up->iocomfun; + up->iocomfun = nil; r->list = m->queue; m->queue = r; unlock(m); @@ -806,6 +811,10 @@ mountio(Mnt *m, Mntrpc *r) if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n) error(Emountrpc); + /* Rpc commited */ + if(r->iocomfun != nil) + (*r->iocomfun)(r->iocomarg, 0); + /* Gate readers onto the mount point one at a time */ for(;;) { lock(m); @@ -948,6 +957,11 @@ mountmux(Mnt *m, Mntrpc *r) /* look for a reply to a message */ if(q->request.tag == r->reply.tag) { *l = q->list; + + /* Rpc completed */ + if(q->iocomfun != nil) + (*q->iocomfun)(q->iocomarg, 1); + if(q == r) { q->done = 1; unlock(m); diff --git a/sys/src/9/port/devstream.c b/sys/src/9/port/devstream.c new file mode 100644 index 000000000..ee175f74e --- /dev/null +++ b/sys/src/9/port/devstream.c @@ -0,0 +1,580 @@ +#include "u.h" +#include "../port/lib.h" +#include "mem.h" +#include "dat.h" +#include "fns.h" +#include "../port/error.h" + +typedef struct Stream Stream; +typedef struct Iocom Iocom; + +struct Stream +{ + Ref; + Lock; + + int iounit; + int noseek; + + Ref nrp; + Ref nwp; + Ref nwq; + + Proc *rp[4]; + Proc *wp[2]; + + Block *rlist; + + vlong soff; + vlong roff; + vlong woff; + + QLock rcl; + QLock wcl; + QLock rql; + QLock wql; + + Rendez wz; + + Queue *rq; + Queue *wq; + Chan *f; +}; + +struct Iocom +{ + Proc *p; + QLock *q; + Stream *s; + Block *b; +}; + +static void +putstream(Stream *s) +{ + if(decref(s)) + return; + freeblist(s->rlist); + qfree(s->rq); + qfree(s->wq); + if(s->f != nil) + cclose(s->f); + free(s); +} + +#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong))) +#define BDONE (1<<15) +#define BERROR (1<<14) + +static Block* +sblock(Stream *s) +{ + Block *b; + + b = allocb(sizeof(vlong)+s->iounit); + b->flag &= ~(BDONE|BERROR); + b->wp += sizeof(vlong); + b->rp = b->wp; + return b; +} + +static void +iocom(void *arg, int complete) +{ + Iocom *io = arg; + Stream *s; + QLock *q; + Proc *p; + + p = io->p; + if(complete && p == up){ + up->iocomfun = nil; + up->iocomarg = nil; + } + + q = io->q; + if(q != nil && p == up){ + io->q = nil; + qunlock(q); + } + + s = io->s; + if(complete && s != nil && s->noseek){ + io->s = nil; + lock(s); + BOFF(io->b) = s->soff; + s->soff += s->iounit; + unlock(s); + } +} + +static void +ioq(Iocom *io, QLock *q) +{ + eqlock(q); /* unlocked in iocom() above */ + + io->p = up; + io->q = q; + io->s = nil; + io->b = nil; + + up->iocomarg = io; + up->iocomfun = iocom; +} + +static void +streamreader(void *arg) +{ + Stream *s = arg; + Iocom io; + Chan *f; + Block *b, *l, **ll; + vlong o; + int id, n; + + id = incref(&s->nrp) % nelem(s->rp); + s->rp[id] = up; + + f = s->f; + b = sblock(s); + qlock(&s->rql); + if(waserror()){ + qhangup(s->rq, up->errstr); + goto Done; + } + if(s->noseek == -1){ + BOFF(b) = 0; + n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL); + + if(n > 0){ + b->wp += n; + b->flag |= BDONE; + b->next = nil; + s->rlist = b; + s->soff = s->iounit; + s->roff = 0; + s->noseek = 1; + + b = sblock(s); + } else { + s->noseek = 0; + } + } + while(!qisclosed(s->rq)) { + ll = &s->rlist; + while((l = *ll) != nil){ + if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){ + if(s->noseek){ + ll = &l->next; + continue; + } + break; + } + if((l->flag & BERROR) != 0) + error((char*)l->rp); + if(BLEN(l) == 0){ + qhangup(s->rq, nil); + poperror(); + goto Done; + } + s->roff += s->noseek ? s->iounit : BLEN(l); + *ll = l->next; + l->next = nil; + qbwrite(s->rq, l); + } + + n = s->iounit; + o = s->roff; + l = s->rlist; + if(s->noseek) { + o = 0; + b->next = l; + s->rlist = b; + } else if(l == nil) { + b->next = nil; + s->rlist = b; + } else { + if(o < BOFF(l)){ + n = BOFF(l) - o; + b->next = l; + s->rlist = b; + } else { + for(;; l = l->next){ + if((l->flag & BDONE) != 0 && BLEN(l) == 0) + goto Done; + o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l)); + if(l->next == nil) + break; + if(o < BOFF(l->next)){ + n = BOFF(l->next) - o; + break; + } + } + b->next = l->next; + l->next = b; + } + } + BOFF(b) = o; + qunlock(&s->rql); + + if(waserror()){ + poperror(); + goto Exit; + } + ioq(&io, &s->rcl); + io.b = b; + io.s = s; + if(waserror()){ + strncpy((char*)b->wp, up->errstr, s->iounit-1); + b->wp[s->iounit-1] = 0; + n = -1; + } else { + n = devtab[f->type]->read(f, b->wp, n, o); + if(n < 0) + error(Eio); + poperror(); + } + iocom(&io, 1); + poperror(); + + l = b; + b = sblock(s); + qlock(&s->rql); + if(n >= 0) + l->wp += n; + else + l->flag |= BERROR; + l->flag |= BDONE; + } + poperror(); +Done: + qunlock(&s->rql); + freeb(b); +Exit: + s->rp[id] = nil; + putstream(s); + pexit("closed", 1); +} + +static void +streamwriter(void *arg) +{ + Stream *s = arg; + Iocom io; + Block *b; + Chan *f; + vlong o; + int id, n; + + id = incref(&s->nwp) % nelem(s->wp); + s->wp[id] = up; + + f = s->f; + while(!qisclosed(s->wq)) { + if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0) + wakeup(&s->wz); /* queue drained */ + if(waserror()){ + decref(&s->nwq); + break; + } + ioq(&io, &s->wcl); + b = qbread(s->wq, s->iounit); + decref(&s->nwq); + if(b == nil){ + iocom(&io, 1); + break; + } + poperror(); + + if(waserror()){ + qhangup(s->wq, up->errstr); + iocom(&io, 1); + freeb(b); + break; + } + n = BLEN(b); + o = s->woff; + s->woff += n; + if(devtab[f->type]->write(f, b->rp, n, o) != n) + error(Eio); + iocom(&io, 1); + freeb(b); + poperror(); + } + + s->wp[id] = nil; + wakeup(&s->wz); + + putstream(s); + pexit("closed", 1); +} + +static int +streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp) +{ + static int perm[] = { 0400, 0200, 0600, 0 }; + Fgrp *fgrp = up->fgrp; + Chan *f; + Qid q; + + if(s == DEVDOTDOT){ + devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp); + return 1; + } + if(s == 0) + return 0; + s--; + if(s > fgrp->maxfd) + return -1; + if((f=fgrp->fd[s]) == nil) + return 0; + sprint(up->genbuf, "%dstream", s); + mkqid(&q, s+1, 0, QTFILE); + devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp); + return 1; +} + +static Chan* +streamattach(char *spec) +{ + return devattach(L'¶', spec); +} + +static Walkqid* +streamwalk(Chan *c, Chan *nc, char **name, int nname) +{ + return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen); +} + +static int +streamstat(Chan *c, uchar *db, int n) +{ + return devstat(c, db, n, (Dirtab *)0, 0L, streamgen); +} + +static Chan* +streamopen(Chan *c, int omode) +{ + Stream *s; + + c->aux = nil; + if(c->qid.type & QTDIR){ + if(omode != 0) + error(Eisdir); + c->mode = 0; + c->flag |= COPEN; + c->offset = 0; + return c; + } + s = mallocz(sizeof(*s), 1); + if(s == nil) + error(Enomem); + incref(s); + if(waserror()){ + putstream(s); + nexterror(); + } + omode = openmode(omode); + s->f = fdtochan(c->qid.path - 1, omode, 0, 1); + if(s->f == nil || s->f->qid.type != QTFILE) + error(Eperm); + s->noseek = -1; + s->roff = s->f->offset; + s->woff = s->f->offset; + s->iounit = s->f->iounit; + if(s->iounit <= 0 || s->iounit > qiomaxatomic) + s->iounit = qiomaxatomic; + c->iounit = s->iounit; + c->aux = s; + c->mode = omode; + c->flag |= COPEN; + c->offset = 0; + poperror(); + return c; +} + +static int +isdrained(void *a) +{ + Stream *s; + int i; + + s = a; + if(s->wq == nil) + return 1; + + if(qisclosed(s->wq) == 0) + return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref; + + for(i=0; iwp); i++) + if(s->wp[i] != nil) + return 0; + + return 1; +} + +static void +streamdrain(Chan *c) +{ + Stream *s; + + if((s = c->aux) == nil) + return; + eqlock(&s->wql); + if(waserror()){ + qunlock(&s->wql); + nexterror(); + } + while(!streamdrained(s)) + sleep(&s->wz, isdrained, s); + qunlock(&s->wql); + poperror(); +} + +static void +streamclose(Chan *c) +{ + Stream *s; + int i; + + if((c->flag & COPEN) == 0 || (s = c->aux) == nil) + return; + if(s->rq != nil){ + qclose(s->rq); + for(i=0; irp); i++) + postnote(s->rp[i], 1, "streamclose", 0); + } + if(s->wq != nil){ + qhangup(s->wq, nil); + if(!waserror()){ + streamdrain(c); + poperror(); + } + qclose(s->wq); /* discard the data */ + for(i=0; iwp); i++) + postnote(s->wp[i], 1, "streamclose", 0); + } + c->aux = nil; + putstream(s); +} + +static int +canpipeline(Chan *f, int mode) +{ + USED(mode); + + return devtab[f->type]->dc == 'M'; +} + +static Queue* +streamqueue(Chan *c, int mode) +{ + Stream *s; + int i, n; + + s = c->aux; + if(s == nil || c->qid.type != QTFILE) + error(Eperm); + + switch(mode){ + case OREAD: + while(s->rq == nil){ + qlock(&s->rql); + if(s->rq != nil){ + qunlock(&s->rql); + break; + } + s->rq = qopen(conf.pipeqsize, 0, 0, 0); + if(s->rq == nil){ + qunlock(&s->rql); + error(Enomem); + } + n = canpipeline(s->f, mode) ? nelem(s->rp) : 1; + for(i=0; inrp.ref != n) + sched(); + qunlock(&s->rql); + break; + } + return s->rq; + case OWRITE: + while(s->wq == nil){ + qlock(&s->wql); + if(s->wq != nil){ + qunlock(&s->wql); + break; + } + s->wq = qopen(conf.pipeqsize, 0, 0, 0); + if(s->wq == nil){ + qunlock(&s->wql); + error(Enomem); + } + n = canpipeline(s->f, mode) ? nelem(s->wp) : 1; + for(i=0; inwp.ref != n) + sched(); + qunlock(&s->wql); + break; + } + return s->wq; + } + error(Egreg); + return nil; +} + +static long +streamread(Chan *c, void *va, long n, vlong) +{ + if(c->qid.type == QTDIR) + return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen); + return qread(streamqueue(c, OREAD), va, n); +} + +static Block* +streambread(Chan *c, long n, ulong) +{ + return qbread(streamqueue(c, OREAD), n); +} + +static long +streamwrite(Chan *c, void *va, long n, vlong) +{ + if(n == 0) + streamdrain(c); + return qwrite(streamqueue(c, OWRITE), va, n); +} + +static long +streambwrite(Chan *c, Block *b, ulong) +{ + if(BLEN(b) == 0) + streamdrain(c); + return qbwrite(streamqueue(c, OWRITE), b); +} + +Dev streamdevtab = { + L'¶', + "stream", + + devreset, + devinit, + devshutdown, + streamattach, + streamwalk, + streamstat, + streamopen, + devcreate, + streamclose, + streamread, + streambread, + streamwrite, + streambwrite, + devremove, + devwstat, +}; diff --git a/sys/src/9/port/portdat.h b/sys/src/9/port/portdat.h index f7501fedc..106f016f8 100644 --- a/sys/src/9/port/portdat.h +++ b/sys/src/9/port/portdat.h @@ -755,7 +755,11 @@ struct Proc * machine specific MMU */ PMMU; + char *syscalltrace; /* syscall trace */ + + void *iocomarg; /* I/O completion callback for pipelining */ + void (*iocomfun)(void*, int); }; enum