devstream: fast sequential file access with 9p pipelining experiment

This commit is contained in:
cinap_lenrek 2015-07-19 03:31:17 +02:00
parent 0bdfa3699d
commit 71cda09d1e
4 changed files with 638 additions and 0 deletions

40
sys/man/3/stream Normal file
View file

@ -0,0 +1,40 @@
.TH STREAM 3
.SH NAME
stream \- fast sequential file access
.SH SYNOPSIS
.nf
.B bind #¶ /fd
.B /fd/0stream
.B /fd/1stream
\&...
.fi
.SH DESCRIPTION
The
.I stream
device serves a one-level directory containing files of the form
.IR N stream
where
.I N
is a file descriptor of the current process.
.PP
An
.IR open (2)
returns a stream file descriptor connected to the original file
refered to by the file descriptor
.IR N .
When a stream was opend for reading, the device will start
continuously reading the file in the background until it reaches
the end of the file. A
.IR read (2)
on the stream consumes the prefetched data in sequential order.
.PP
When a stream is opend for writing, writes to the stream will
return immidiately without waiting for the data to be written
to the file. A zero-length write can be used to wait for the
buffered data to drain and return any previous write errors.
.SH SEE ALSO
.IR dup (2),
.IR pipe (3)
.SH SOURCE
.B /sys/src/9/port/devstream.c

View file

@ -30,6 +30,8 @@ struct Mntrpc
uint rpclen; /* len of buffer */
Block* b; /* reply blocks */
Mntrpc* flushed; /* message this one flushes */
void *iocomarg; /* Rpc completion callback for pipelining */
void (*iocomfun)(void*, int);
char done; /* Rpc completed */
};
@ -789,6 +791,9 @@ mountio(Mnt *m, Mntrpc *r)
lock(m);
r->z = &up->sleep;
r->m = m;
r->iocomarg = up->iocomarg;
r->iocomfun = up->iocomfun;
up->iocomfun = nil;
r->list = m->queue;
m->queue = r;
unlock(m);
@ -806,6 +811,10 @@ mountio(Mnt *m, Mntrpc *r)
if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n)
error(Emountrpc);
/* Rpc commited */
if(r->iocomfun != nil)
(*r->iocomfun)(r->iocomarg, 0);
/* Gate readers onto the mount point one at a time */
for(;;) {
lock(m);
@ -948,6 +957,11 @@ mountmux(Mnt *m, Mntrpc *r)
/* look for a reply to a message */
if(q->request.tag == r->reply.tag) {
*l = q->list;
/* Rpc completed */
if(q->iocomfun != nil)
(*q->iocomfun)(q->iocomarg, 1);
if(q == r) {
q->done = 1;
unlock(m);

580
sys/src/9/port/devstream.c Normal file
View file

@ -0,0 +1,580 @@
#include "u.h"
#include "../port/lib.h"
#include "mem.h"
#include "dat.h"
#include "fns.h"
#include "../port/error.h"
typedef struct Stream Stream;
typedef struct Iocom Iocom;
struct Stream
{
Ref;
Lock;
int iounit;
int noseek;
Ref nrp;
Ref nwp;
Ref nwq;
Proc *rp[4];
Proc *wp[2];
Block *rlist;
vlong soff;
vlong roff;
vlong woff;
QLock rcl;
QLock wcl;
QLock rql;
QLock wql;
Rendez wz;
Queue *rq;
Queue *wq;
Chan *f;
};
struct Iocom
{
Proc *p;
QLock *q;
Stream *s;
Block *b;
};
static void
putstream(Stream *s)
{
if(decref(s))
return;
freeblist(s->rlist);
qfree(s->rq);
qfree(s->wq);
if(s->f != nil)
cclose(s->f);
free(s);
}
#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong)))
#define BDONE (1<<15)
#define BERROR (1<<14)
static Block*
sblock(Stream *s)
{
Block *b;
b = allocb(sizeof(vlong)+s->iounit);
b->flag &= ~(BDONE|BERROR);
b->wp += sizeof(vlong);
b->rp = b->wp;
return b;
}
static void
iocom(void *arg, int complete)
{
Iocom *io = arg;
Stream *s;
QLock *q;
Proc *p;
p = io->p;
if(complete && p == up){
up->iocomfun = nil;
up->iocomarg = nil;
}
q = io->q;
if(q != nil && p == up){
io->q = nil;
qunlock(q);
}
s = io->s;
if(complete && s != nil && s->noseek){
io->s = nil;
lock(s);
BOFF(io->b) = s->soff;
s->soff += s->iounit;
unlock(s);
}
}
static void
ioq(Iocom *io, QLock *q)
{
eqlock(q); /* unlocked in iocom() above */
io->p = up;
io->q = q;
io->s = nil;
io->b = nil;
up->iocomarg = io;
up->iocomfun = iocom;
}
static void
streamreader(void *arg)
{
Stream *s = arg;
Iocom io;
Chan *f;
Block *b, *l, **ll;
vlong o;
int id, n;
id = incref(&s->nrp) % nelem(s->rp);
s->rp[id] = up;
f = s->f;
b = sblock(s);
qlock(&s->rql);
if(waserror()){
qhangup(s->rq, up->errstr);
goto Done;
}
if(s->noseek == -1){
BOFF(b) = 0;
n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL);
if(n > 0){
b->wp += n;
b->flag |= BDONE;
b->next = nil;
s->rlist = b;
s->soff = s->iounit;
s->roff = 0;
s->noseek = 1;
b = sblock(s);
} else {
s->noseek = 0;
}
}
while(!qisclosed(s->rq)) {
ll = &s->rlist;
while((l = *ll) != nil){
if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){
if(s->noseek){
ll = &l->next;
continue;
}
break;
}
if((l->flag & BERROR) != 0)
error((char*)l->rp);
if(BLEN(l) == 0){
qhangup(s->rq, nil);
poperror();
goto Done;
}
s->roff += s->noseek ? s->iounit : BLEN(l);
*ll = l->next;
l->next = nil;
qbwrite(s->rq, l);
}
n = s->iounit;
o = s->roff;
l = s->rlist;
if(s->noseek) {
o = 0;
b->next = l;
s->rlist = b;
} else if(l == nil) {
b->next = nil;
s->rlist = b;
} else {
if(o < BOFF(l)){
n = BOFF(l) - o;
b->next = l;
s->rlist = b;
} else {
for(;; l = l->next){
if((l->flag & BDONE) != 0 && BLEN(l) == 0)
goto Done;
o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l));
if(l->next == nil)
break;
if(o < BOFF(l->next)){
n = BOFF(l->next) - o;
break;
}
}
b->next = l->next;
l->next = b;
}
}
BOFF(b) = o;
qunlock(&s->rql);
if(waserror()){
poperror();
goto Exit;
}
ioq(&io, &s->rcl);
io.b = b;
io.s = s;
if(waserror()){
strncpy((char*)b->wp, up->errstr, s->iounit-1);
b->wp[s->iounit-1] = 0;
n = -1;
} else {
n = devtab[f->type]->read(f, b->wp, n, o);
if(n < 0)
error(Eio);
poperror();
}
iocom(&io, 1);
poperror();
l = b;
b = sblock(s);
qlock(&s->rql);
if(n >= 0)
l->wp += n;
else
l->flag |= BERROR;
l->flag |= BDONE;
}
poperror();
Done:
qunlock(&s->rql);
freeb(b);
Exit:
s->rp[id] = nil;
putstream(s);
pexit("closed", 1);
}
static void
streamwriter(void *arg)
{
Stream *s = arg;
Iocom io;
Block *b;
Chan *f;
vlong o;
int id, n;
id = incref(&s->nwp) % nelem(s->wp);
s->wp[id] = up;
f = s->f;
while(!qisclosed(s->wq)) {
if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0)
wakeup(&s->wz); /* queue drained */
if(waserror()){
decref(&s->nwq);
break;
}
ioq(&io, &s->wcl);
b = qbread(s->wq, s->iounit);
decref(&s->nwq);
if(b == nil){
iocom(&io, 1);
break;
}
poperror();
if(waserror()){
qhangup(s->wq, up->errstr);
iocom(&io, 1);
freeb(b);
break;
}
n = BLEN(b);
o = s->woff;
s->woff += n;
if(devtab[f->type]->write(f, b->rp, n, o) != n)
error(Eio);
iocom(&io, 1);
freeb(b);
poperror();
}
s->wp[id] = nil;
wakeup(&s->wz);
putstream(s);
pexit("closed", 1);
}
static int
streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp)
{
static int perm[] = { 0400, 0200, 0600, 0 };
Fgrp *fgrp = up->fgrp;
Chan *f;
Qid q;
if(s == DEVDOTDOT){
devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp);
return 1;
}
if(s == 0)
return 0;
s--;
if(s > fgrp->maxfd)
return -1;
if((f=fgrp->fd[s]) == nil)
return 0;
sprint(up->genbuf, "%dstream", s);
mkqid(&q, s+1, 0, QTFILE);
devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp);
return 1;
}
static Chan*
streamattach(char *spec)
{
return devattach(L'', spec);
}
static Walkqid*
streamwalk(Chan *c, Chan *nc, char **name, int nname)
{
return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen);
}
static int
streamstat(Chan *c, uchar *db, int n)
{
return devstat(c, db, n, (Dirtab *)0, 0L, streamgen);
}
static Chan*
streamopen(Chan *c, int omode)
{
Stream *s;
c->aux = nil;
if(c->qid.type & QTDIR){
if(omode != 0)
error(Eisdir);
c->mode = 0;
c->flag |= COPEN;
c->offset = 0;
return c;
}
s = mallocz(sizeof(*s), 1);
if(s == nil)
error(Enomem);
incref(s);
if(waserror()){
putstream(s);
nexterror();
}
omode = openmode(omode);
s->f = fdtochan(c->qid.path - 1, omode, 0, 1);
if(s->f == nil || s->f->qid.type != QTFILE)
error(Eperm);
s->noseek = -1;
s->roff = s->f->offset;
s->woff = s->f->offset;
s->iounit = s->f->iounit;
if(s->iounit <= 0 || s->iounit > qiomaxatomic)
s->iounit = qiomaxatomic;
c->iounit = s->iounit;
c->aux = s;
c->mode = omode;
c->flag |= COPEN;
c->offset = 0;
poperror();
return c;
}
static int
isdrained(void *a)
{
Stream *s;
int i;
s = a;
if(s->wq == nil)
return 1;
if(qisclosed(s->wq) == 0)
return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref;
for(i=0; i<nelem(s->wp); i++)
if(s->wp[i] != nil)
return 0;
return 1;
}
static void
streamdrain(Chan *c)
{
Stream *s;
if((s = c->aux) == nil)
return;
eqlock(&s->wql);
if(waserror()){
qunlock(&s->wql);
nexterror();
}
while(!streamdrained(s))
sleep(&s->wz, isdrained, s);
qunlock(&s->wql);
poperror();
}
static void
streamclose(Chan *c)
{
Stream *s;
int i;
if((c->flag & COPEN) == 0 || (s = c->aux) == nil)
return;
if(s->rq != nil){
qclose(s->rq);
for(i=0; i<nelem(s->rp); i++)
postnote(s->rp[i], 1, "streamclose", 0);
}
if(s->wq != nil){
qhangup(s->wq, nil);
if(!waserror()){
streamdrain(c);
poperror();
}
qclose(s->wq); /* discard the data */
for(i=0; i<nelem(s->wp); i++)
postnote(s->wp[i], 1, "streamclose", 0);
}
c->aux = nil;
putstream(s);
}
static int
canpipeline(Chan *f, int mode)
{
USED(mode);
return devtab[f->type]->dc == 'M';
}
static Queue*
streamqueue(Chan *c, int mode)
{
Stream *s;
int i, n;
s = c->aux;
if(s == nil || c->qid.type != QTFILE)
error(Eperm);
switch(mode){
case OREAD:
while(s->rq == nil){
qlock(&s->rql);
if(s->rq != nil){
qunlock(&s->rql);
break;
}
s->rq = qopen(conf.pipeqsize, 0, 0, 0);
if(s->rq == nil){
qunlock(&s->rql);
error(Enomem);
}
n = canpipeline(s->f, mode) ? nelem(s->rp) : 1;
for(i=0; i<n; i++){
incref(s);
kproc("streamreader", streamreader, s);
}
while(s->nrp.ref != n)
sched();
qunlock(&s->rql);
break;
}
return s->rq;
case OWRITE:
while(s->wq == nil){
qlock(&s->wql);
if(s->wq != nil){
qunlock(&s->wql);
break;
}
s->wq = qopen(conf.pipeqsize, 0, 0, 0);
if(s->wq == nil){
qunlock(&s->wql);
error(Enomem);
}
n = canpipeline(s->f, mode) ? nelem(s->wp) : 1;
for(i=0; i<n; i++){
incref(s);
kproc("streamwriter", streamwriter, s);
}
while(s->nwp.ref != n)
sched();
qunlock(&s->wql);
break;
}
return s->wq;
}
error(Egreg);
return nil;
}
static long
streamread(Chan *c, void *va, long n, vlong)
{
if(c->qid.type == QTDIR)
return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen);
return qread(streamqueue(c, OREAD), va, n);
}
static Block*
streambread(Chan *c, long n, ulong)
{
return qbread(streamqueue(c, OREAD), n);
}
static long
streamwrite(Chan *c, void *va, long n, vlong)
{
if(n == 0)
streamdrain(c);
return qwrite(streamqueue(c, OWRITE), va, n);
}
static long
streambwrite(Chan *c, Block *b, ulong)
{
if(BLEN(b) == 0)
streamdrain(c);
return qbwrite(streamqueue(c, OWRITE), b);
}
Dev streamdevtab = {
L'',
"stream",
devreset,
devinit,
devshutdown,
streamattach,
streamwalk,
streamstat,
streamopen,
devcreate,
streamclose,
streamread,
streambread,
streamwrite,
streambwrite,
devremove,
devwstat,
};

View file

@ -755,7 +755,11 @@ struct Proc
* machine specific MMU
*/
PMMU;
char *syscalltrace; /* syscall trace */
void *iocomarg; /* I/O completion callback for pipelining */
void (*iocomfun)(void*, int);
};
enum