From 6617c63a374f7211b41252d3957e8a89061b8a49 Mon Sep 17 00:00:00 2001 From: cinap_lenrek Date: Sun, 26 Jul 2015 05:43:26 +0200 Subject: [PATCH] kernel: pipelined read ahead for the mount cache this changes devmnt adding mntrahread() function and some helpers for it to do pipelined sequential read ahead for the mount cache. basically, cread() calls mntrahread() with Mntrah structure and it figures out if we where reading sequentially and if thats the case issues reads of c->iounit size in advance. the read ahead state (Mntrah) is kept in the mount cache so we can handle (read ahead) cache invalidation in the presence of writes. --- sys/src/9/port/cache.c | 165 ++++++++++++----- sys/src/9/port/devmnt.c | 376 +++++++++++++++++++++++++++++++-------- sys/src/9/port/portdat.h | 26 +++ sys/src/9/port/portfns.h | 2 + sys/src/9/port/qio.c | 30 ++++ 5 files changed, 483 insertions(+), 116 deletions(-) diff --git a/sys/src/9/port/cache.c b/sys/src/9/port/cache.c index e1f449df4..306228731 100644 --- a/sys/src/9/port/cache.c +++ b/sys/src/9/port/cache.c @@ -15,20 +15,30 @@ enum NBITMAP = (PGROUND(MAXCACHE)/BY2PG + MAPBITS-1) / MAPBITS, }; +/* devmnt.c: parallel read ahread implementation */ +extern void mntrahinit(Mntrah *rah); +extern long mntrahread(Mntrah *rah, Chan *c, uchar *buf, long len, vlong off); + typedef struct Mntcache Mntcache; struct Mntcache { - Qid qid; - int dev; - int type; + Qid qid; + int dev; + int type; QLock; + Proc *locked; + ulong nlocked; + Mntcache *hash; Mntcache *prev; Mntcache *next; /* page bitmap of valid pages */ ulong bitmap[NBITMAP]; + + /* read ahead state */ + Mntrah rah; }; typedef struct Cache Cache; @@ -124,13 +134,66 @@ clookup(Chan *c, int skipvers) return nil; } +/* + * resursive Mntcache locking. Mntcache.rah is protected by the + * same lock and we want to call cupdate() from mntrahread() + * while holding the lock. + */ +static int +cancachelock(Mntcache *m) +{ + if(m->locked == up || canqlock(m)){ + m->locked = up; + m->nlocked++; + return 1; + } + return 0; +} +static void +cachelock(Mntcache *m) +{ + if(m->locked != up){ + qlock(m); + assert(m->nlocked == 0); + m->locked = up; + } + m->nlocked++; + +} +static void +cacheunlock(Mntcache *m) +{ + assert(m->locked == up); + if(--m->nlocked == 0){ + m->locked = nil; + qunlock(m); + } +} + +/* return locked Mntcache if still valid else reset mcp */ +static Mntcache* +ccache(Chan *c) +{ + Mntcache *m; + + m = c->mcp; + if(m != nil) { + cachelock(m); + if(eqchantdqid(c, m->type, m->dev, m->qid, 0) && c->qid.type == m->qid.type) + return m; + c->mcp = nil; + cacheunlock(m); + } + return nil; +} + void copen(Chan *c) { Mntcache *m, *f, **l; - /* directories aren't cacheable and append-only files confuse us */ - if(c->qid.type&(QTDIR|QTAPPEND)){ + /* directories aren't cacheable */ + if(c->qid.type&QTDIR){ c->mcp = nil; return; } @@ -156,9 +219,9 @@ copen(Chan *c) l = &f->hash; } - if(!canqlock(m)){ + if(!cancachelock(m)){ unlock(&cache); - qlock(m); + cachelock(m); lock(&cache); f = clookup(c, 0); if(f != nil) { @@ -169,7 +232,7 @@ copen(Chan *c) */ ctail(f); unlock(&cache); - qunlock(m); + cacheunlock(m); c->mcp = f; return; } @@ -182,29 +245,18 @@ copen(Chan *c) l = &cache.hash[c->qid.path%NHASH]; m->hash = *l; *l = m; + unlock(&cache); + + m->rah.vers = m->qid.vers; + mntrahinit(&m->rah); cnodata(m); - qunlock(m); + + cacheunlock(m); + c->mcp = m; } -/* return locked Mntcache if still valid else reset mcp */ -static Mntcache* -ccache(Chan *c) -{ - Mntcache *m; - - m = c->mcp; - if(m != nil) { - qlock(m); - if(eqchantdqid(c, m->type, m->dev, m->qid, 0) && c->qid.type == m->qid.type) - return m; - c->mcp = nil; - qunlock(m); - } - return nil; -} - enum { VABITS = 8*sizeof(uintptr) - 2*PGSHIFT, VAMASK = (((uintptr)1 << VABITS)-1) << PGSHIFT, @@ -243,17 +295,24 @@ cread(Chan *c, uchar *buf, int len, vlong off) KMap *k; Page *p; Mntcache *m; - int l, total; + int l, tot; ulong offset, pn, po, pe; - if(off >= MAXCACHE || len <= 0) + if(len <= 0) return 0; m = ccache(c); if(m == nil) return 0; - total = 0; + if(waserror()){ + cacheunlock(m); + nexterror(); + } + + tot = 0; + if(off >= MAXCACHE) + goto Prefetch; offset = off; if(offset+len > MAXCACHE) @@ -277,16 +336,16 @@ cread(Chan *c, uchar *buf, int len, vlong off) if(waserror()) { kunmap(k); putpage(p); - qunlock(m); nexterror(); } memmove(buf, (uchar*)VA(k) + offset, l); - poperror(); kunmap(k); - putpage(p); + poperror(); - total += l; + tot += l; + buf += l; + len -= l; offset += l; offset &= (BY2PG-1); @@ -294,12 +353,21 @@ cread(Chan *c, uchar *buf, int len, vlong off) break; pn++; - buf += l; - len -= l; } - qunlock(m); - return total; +Prefetch: + if(len > 0){ + if(m->rah.vers != m->qid.vers){ + mntrahinit(&m->rah); + m->rah.vers = m->qid.vers; + } + off += tot; + tot += mntrahread(&m->rah, c, buf, len, off); + } + cacheunlock(m); + poperror(); + + return tot; } /* invalidate pages in page bitmap */ @@ -322,7 +390,7 @@ cachedata(Mntcache *m, uchar *buf, int len, vlong off) ulong offset, pn, po, pe; if(off >= MAXCACHE || len <= 0){ - qunlock(m); + cacheunlock(m); return; } @@ -370,7 +438,7 @@ cachedata(Mntcache *m, uchar *buf, int len, vlong off) kunmap(k); putpage(p); invalidate(m, offset + pn*BY2PG, len); - qunlock(m); + cacheunlock(m); nexterror(); } memmove((uchar*)VA(k) + offset, buf, l); @@ -383,7 +451,7 @@ cachedata(Mntcache *m, uchar *buf, int len, vlong off) buf += l; len -= l; } - qunlock(m); + cacheunlock(m); } void @@ -407,5 +475,22 @@ cwrite(Chan* c, uchar *buf, int len, vlong off) return; m->qid.vers++; c->qid.vers++; + if(c->qid.type&QTAPPEND){ + cacheunlock(m); + return; + } cachedata(m, buf, len, off); } + +void +cclunk(Chan *c) +{ + Mntcache *m; + + m = ccache(c); + if(m == nil) + return; + mntrahinit(&m->rah); + cacheunlock(m); + c->mcp = nil; +} diff --git a/sys/src/9/port/devmnt.c b/sys/src/9/port/devmnt.c index 91d6ad2f9..18bea77bd 100644 --- a/sys/src/9/port/devmnt.c +++ b/sys/src/9/port/devmnt.c @@ -42,7 +42,7 @@ enum NMASK = (64*1024)>>TAGSHIFT, }; -struct Mntalloc +static struct Mntalloc { Lock; Mnt* list; /* Mount devices in use */ @@ -54,21 +54,21 @@ struct Mntalloc ulong tagmask[NMASK]; }mntalloc; -Mnt* mntchk(Chan*); -void mntdirfix(uchar*, Chan*); -Mntrpc* mntflushalloc(Mntrpc*, ulong); -Mntrpc* mntflushfree(Mnt*, Mntrpc*); -void mntfree(Mntrpc*); -void mntgate(Mnt*); -void mntqrm(Mnt*, Mntrpc*); -Mntrpc* mntralloc(Chan*, ulong); -long mntrdwr(int, Chan*, void*, long, vlong); -int mntrpcread(Mnt*, Mntrpc*); -void mountio(Mnt*, Mntrpc*); -void mountmux(Mnt*, Mntrpc*); -void mountrpc(Mnt*, Mntrpc*); -int rpcattn(void*); -Chan* mntchan(void); +static Chan* mntchan(void); +static Mnt* mntchk(Chan*); +static void mntdirfix(uchar*, Chan*); +static Mntrpc* mntflushalloc(Mntrpc*, ulong); +static Mntrpc* mntflushfree(Mnt*, Mntrpc*); +static void mntfree(Mntrpc*); +static void mntgate(Mnt*); +static void mntqrm(Mnt*, Mntrpc*); +static Mntrpc* mntralloc(Chan*, ulong); +static long mntrdwr(int, Chan*, void*, long, vlong); +static int mntrpcread(Mnt*, Mntrpc*); +static void mountio(Mnt*, Mntrpc*); +static void mountmux(Mnt*, Mntrpc*); +static void mountrpc(Mnt*, Mntrpc*); +static int rpcattn(void*); char Esbadstat[] = "invalid directory entry received from server"; char Enoversion[] = "version not established for mount channel"; @@ -257,7 +257,6 @@ mntauth(Chan *c, char *spec) Mntrpc *r; m = c->mux; - if(m == nil){ mntversion(c, VERSION9P, MAXRPC, 0); m = c->mux; @@ -275,7 +274,6 @@ mntauth(Chan *c, char *spec) } r = mntralloc(0, m->msize); - if(waserror()) { mntfree(r); nexterror(); @@ -319,7 +317,6 @@ mntattach(char *muxattach) c = bogus.chan; m = c->mux; - if(m == nil){ mntversion(c, nil, 0, 0); m = c->mux; @@ -342,7 +339,6 @@ mntattach(char *muxattach) mntfree(r); nexterror(); } - r->request.type = Tattach; r->request.fid = c->fid; if(bogus.authchan == nil) @@ -368,7 +364,7 @@ mntattach(char *muxattach) return c; } -Chan* +static Chan* mntchan(void) { Chan *c; @@ -552,13 +548,13 @@ mntclunk(Chan *c, int t) Mnt *m; Mntrpc *r; + cclunk(c); m = mntchk(c); r = mntralloc(c, m->msize); if(waserror()){ mntfree(r); nexterror(); } - r->request.type = t; r->request.fid = c->fid; mountrpc(m, r); @@ -658,24 +654,59 @@ mntwrite(Chan *c, void *buf, long n, vlong off) return mntrdwr(Twrite, c, buf, n, off); } -long +static void +mntcache(Mntrpc *r) +{ + ulong n, m; + vlong off; + Block *b; + Chan *c; + + c = r->c; + if((c->flag & CCACHE) == 0 || c->mcp == nil) + return; + off = r->request.offset; + switch(r->reply.type){ + case Rread: + m = r->reply.count; + if(m > r->request.count) + m = r->request.count; + + for(b = r->b; m > 0 && b != nil; b = b->next) { + n = BLEN(b); + if(m < n) + n = m; + cupdate(c, b->rp, n, off); + off += n; + } + break; + case Rwrite: + if(convM2S(r->rpc, r->rpclen, &r->request) == 0) + panic("convM2S"); + cwrite(c, (uchar*)r->request.data, r->request.count, off); + break; + } +} + +static long mntrdwr(int type, Chan *c, void *buf, long n, vlong off) { Mnt *m; Mntrpc *r; char *uba; - int cache; ulong cnt, nr, nreq; m = mntchk(c); uba = buf; cnt = 0; - cache = c->flag & CCACHE; - if(c->qid.type & QTDIR) - cache = 0; + for(;;) { - if(cache && type == Tread) { - nr = cread(c, (uchar*)uba, n, off); + nreq = n; + if(nreq > m->msize-IOHDRSZ) + nreq = m->msize-IOHDRSZ; + + if(type == Tread) { + nr = cread(c, (uchar*)uba, nreq, off); if(nr > 0) { nreq = nr; goto Next; @@ -691,43 +722,12 @@ mntrdwr(int type, Chan *c, void *buf, long n, vlong off) r->request.fid = c->fid; r->request.offset = off; r->request.data = uba; - nr = n; - if(nr > m->msize-IOHDRSZ) - nr = m->msize-IOHDRSZ; - r->request.count = nr; + r->request.count = nreq; mountrpc(m, r); - nreq = r->request.count; + mntcache(r); nr = r->reply.count; if(nr > nreq) nr = nreq; - - if(cache) { - /* - * note that we cannot update the cache from uba as - * the user could change its contents from another - * process before the data gets copied to the cached. - */ - if(type == Tread) { - ulong nc, nn; - Block *b; - - nc = 0; - for(b = r->b; b != nil; b = b->next) { - nn = BLEN(b); - if(nc+nn > nr) - nn = nr - nc; - cupdate(c, b->rp, nn, off + nc); - nc += nn; - if(nc >= nr) - break; - } - } else { - if(convM2S(r->rpc, r->rpclen, &r->request) == 0) - panic("convM2S"); - cwrite(c, (uchar*)r->request.data, nr, off); - } - } - if(type == Tread) r->b = bl2mem((uchar*)uba, r->b, nr); mntfree(r); @@ -744,7 +744,231 @@ mntrdwr(int type, Chan *c, void *buf, long n, vlong off) return cnt; } +static int +mntprocwork(void *a) +{ + Mntproc *p = a; + return p->f != nil; +} + +static void +mntproc(void *a) +{ + Mntproc *p = a; + Chan *c; + Mnt *m; + + while(waserror()) + ; + + m = p->m; + for(;;){ + tsleep(p, mntprocwork, p, 500); + + lock(m); + if(p->f == nil){ + p->m = nil; + unlock(m); + pexit("no work", 1); + } + c = p->r->c; + unlock(m); + + (*p->f)(p->r, p->a); + + lock(m); + p->r = nil; + p->a = nil; + p->f = nil; + unlock(m); + + cclose(c); + } +} + +static int +mntdefer(void (*f)(Mntrpc*, void*), Mntrpc *r, void *a) +{ + Mntproc *p; + Mnt *m; + int i; + + m = mntchk(r->c); + lock(m); + for(i = 0; i < nelem(m->defered); i++){ + p = &m->defered[i]; + if(p->f != nil) + continue; + + incref(r->c); + r->m = m; + p->r = r; + p->a = a; + p->f = f; + + if(p->m == nil){ + p->m = m; + unlock(m); + kproc("mntporc", mntproc, p); + } else { + unlock(m); + wakeup(p); + } + return 1; + } + unlock(m); + return 0; +} + +static void +rahproc(Mntrpc *r, void *a) +{ + Mntrah *rah = a; + + if(!waserror()){ + mountrpc(r->m, r); + poperror(); + } + r->done = 2; + wakeup(rah); +} + +static int +rahdone(void *v) +{ + Mntrpc *r = v; + return r->done == 2; +} + +static Mntrpc* +rahfindrpc(Mntrah *rah, vlong off) +{ + Mntrpc *r; + int i, n; + vlong o; + + for(i=0; ir); i++){ + if((r = rah->r[i]) == nil) + continue; + n = r->request.count; + o = r->request.offset; + if(off >= o && off < o+n) + return r; + } + return nil; +} + void +mntrahinit(Mntrah *rah) +{ + Mntrpc *r; + int i; + + while(waserror()) + ; + + for(i=0; ir); i++){ + if((r = rah->r[i]) != nil){ + while(!rahdone(r)) + sleep(rah, rahdone, r); + rah->r[i] = nil; + mntfree(r); + } + } + rah->i = 0; + + rah->off = 0; + rah->seq = 0; + + poperror(); +} + +long +mntrahread(Mntrah *rah, Chan *c, uchar *buf, long len, vlong off) +{ + Mntrpc *r, **rr; + vlong o, w, e; + long n, tot; + Mnt *m; + + if(len <= 0) + return 0; + if(off != rah->off){ + rah->off = off; + rah->seq = 0; + } + rah->off += len; + rah->seq += len; + if(rah->seq >= 2*c->iounit){ + w = (off / c->iounit) * c->iounit; + e = w + rah->seq; + for(o = w; w < e; o += c->iounit){ + if(rahfindrpc(rah, o) != nil) + continue; + + rr = &rah->r[rah->i % nelem(rah->r)]; + if((r = *rr) != nil){ + if(!rahdone(r) || (r->request.offset >= w && r->request.offset < e)) + break; + *rr = nil; + mntfree(r); + } + + m = mntchk(c); + r = mntralloc(c, m->msize); + r->request.type = Tread; + r->request.fid = c->fid; + r->request.offset = o; + r->request.count = c->iounit; + if(!mntdefer(rahproc, r, rah)){ + mntfree(r); + break; + } + *rr = r; + rah->i++; + } + } + + tot = 0; + while(len > 0 && (r = rahfindrpc(rah, off)) != nil){ + while(!rahdone(r)) + sleep(rah, rahdone, r); + + switch(r->reply.type){ + default: + error(Emountrpc); + case Rflush: + error(Eintr); + case Rerror: + error(r->reply.ename); + case Rread: + break; + } + mntcache(r); + n = r->request.count; + o = r->request.offset; + if(r->reply.count < n) + n = r->reply.count; + n -= (off - o); + if(n <= 0) + break; + if(len < n) + n = len; + n = readblist(r->b, buf, n, off - o); + buf += n; + off += n; + tot += n; + len -= n; + } + if(tot > 0){ + rah->off -= len; + rah->seq -= len; + } + + return tot; +} + +static void mountrpc(Mnt *m, Mntrpc *r) { char *sn, *cn; @@ -778,7 +1002,7 @@ mountrpc(Mnt *m, Mntrpc *r) } } -void +static void mountio(Mnt *m, Mntrpc *r) { int n; @@ -870,7 +1094,7 @@ doread(Mnt *m, int len) return 0; } -int +static int mntrpcread(Mnt *m, Mntrpc *r) { int i, t, len, hlen; @@ -942,7 +1166,7 @@ mntrpcread(Mnt *m, Mntrpc *r) return 0; } -void +static void mntgate(Mnt *m) { Mntrpc *q; @@ -957,7 +1181,7 @@ mntgate(Mnt *m) unlock(m); } -void +static void mountmux(Mnt *m, Mntrpc *r) { Mntrpc **l, *q; @@ -1003,7 +1227,7 @@ mountmux(Mnt *m, Mntrpc *r) * Create a new flush request and chain the previous * requests from it */ -Mntrpc* +static Mntrpc* mntflushalloc(Mntrpc *r, ulong iounit) { Mntrpc *fr; @@ -1027,7 +1251,7 @@ mntflushalloc(Mntrpc *r, ulong iounit) * and if it hasn't been answered set the reply to to * Rflush. Return the original rpc. */ -Mntrpc* +static Mntrpc* mntflushfree(Mnt *m, Mntrpc *r) { Mntrpc *fr; @@ -1046,7 +1270,7 @@ mntflushfree(Mnt *m, Mntrpc *r) return r; } -int +static int alloctag(void) { int i, j; @@ -1066,13 +1290,13 @@ alloctag(void) return NOTAG; } -void +static void freetag(int t) { mntalloc.tagmask[t>>TAGSHIFT] &= ~(1<<(t&TAGMASK)); } -Mntrpc* +static Mntrpc* mntralloc(Chan *c, ulong msize) { Mntrpc *new; @@ -1122,7 +1346,7 @@ mntralloc(Chan *c, ulong msize) return new; } -void +static void mntfree(Mntrpc *r) { if(r->b != nil) @@ -1142,7 +1366,7 @@ mntfree(Mntrpc *r) unlock(&mntalloc); } -void +static void mntqrm(Mnt *m, Mntrpc *r) { Mntrpc **l, *f; @@ -1161,7 +1385,7 @@ mntqrm(Mnt *m, Mntrpc *r) unlock(m); } -Mnt* +static Mnt* mntchk(Chan *c) { Mnt *m; @@ -1190,7 +1414,7 @@ mntchk(Chan *c) * reflect local values. These entries are known to be * the first two in the Dir encoding after the count. */ -void +static void mntdirfix(uchar *dirbuf, Chan *c) { uint r; @@ -1202,7 +1426,7 @@ mntdirfix(uchar *dirbuf, Chan *c) PBIT32(dirbuf, c->dev); } -int +static int rpcattn(void *v) { Mntrpc *r; diff --git a/sys/src/9/port/portdat.h b/sys/src/9/port/portdat.h index 106f016f8..16f24a10d 100644 --- a/sys/src/9/port/portdat.h +++ b/sys/src/9/port/portdat.h @@ -16,7 +16,9 @@ typedef struct Log Log; typedef struct Logflag Logflag; typedef struct Mntcache Mntcache; typedef struct Mount Mount; +typedef struct Mntrah Mntrah; typedef struct Mntrpc Mntrpc; +typedef struct Mntproc Mntproc; typedef struct Mnt Mnt; typedef struct Mhead Mhead; typedef struct Note Note; @@ -270,6 +272,29 @@ struct Mhead Mhead* hash; /* Hash chain */ }; +struct Mntrah +{ + Rendez; + + ulong vers; + + vlong off; + vlong seq; + + uint i; + Mntrpc *r[8]; +}; + +struct Mntproc +{ + Rendez; + + Mnt *m; + Mntrpc *r; + void *a; + void (*f)(Mntrpc*, void*); +}; + struct Mnt { Lock; @@ -277,6 +302,7 @@ struct Mnt Chan *c; /* Channel to file service */ Proc *rip; /* Reader in progress */ Mntrpc *queue; /* Queue of pending requests on this channel */ + Mntproc defered[8]; /* Worker processes for defered RPCs (read ahead) */ ulong id; /* Multiplexer id for channel check */ Mnt *list; /* Free list */ int flags; /* cache */ diff --git a/sys/src/9/port/portfns.h b/sys/src/9/port/portfns.h index 2f135c31b..db547d1a1 100644 --- a/sys/src/9/port/portfns.h +++ b/sys/src/9/port/portfns.h @@ -42,6 +42,7 @@ void confinit(void); int consactive(void); void (*consdebug)(void); void copen(Chan*); +void cclunk(Chan*); Block* concatblock(Block*); Block* copyblock(Block*, int); void copypage(Page*, Page*); @@ -283,6 +284,7 @@ int rand(void); void randominit(void); ulong randomread(void*, ulong); void rdb(void); +long readblist(Block *, uchar *, long, long); int readnum(ulong, char*, ulong, ulong, int); int readstr(ulong, char*, ulong, char*); void ready(Proc*); diff --git a/sys/src/9/port/qio.c b/sys/src/9/port/qio.c index d34178f46..9355ace11 100644 --- a/sys/src/9/port/qio.c +++ b/sys/src/9/port/qio.c @@ -904,6 +904,36 @@ qremove(Queue *q) return b; } +/* + * copy the contents of a string of blocks into + * memory from an offset. blocklist kept unchanged. + * return number of copied bytes. + */ +long +readblist(Block *b, uchar *p, long n, long o) +{ + long m, r; + + r = 0; + while(n > 0 && b != nil){ + m = BLEN(b); + if(o >= m) + o -= m; + else { + m -= o; + if(n < m) + m = n; + memmove(p, b->rp + o, m); + p += m; + r += m; + n -= m; + o = 0; + } + b = b->next; + } + return r; +} + /* * copy the contents of a string of blocks into * memory. emptied blocks are freed. return