kernel/qio: big cleanup of qio functions

remove bl2mem(), it is broken. a fault while copying to memory
yields a partially freed block list. it can be simply replaced
by readblist() and freeblist(), which we also use for qcopy()
now.

remove mem2bl(), and handle putting back remainer from a short
read internally (splitblock()) avoiding the releasing and re-
acquiering of the ilock.

always attempt to free blocks outside of the ilock.

have qaddlist() return the number of bytes enqueued, which
avoids walking the block list twice.
This commit is contained in:
cinap_lenrek 2016-11-07 22:20:10 +01:00
parent 23d217afb4
commit a54d1cd95e
3 changed files with 135 additions and 259 deletions

View file

@ -729,7 +729,7 @@ mntrdwr(int type, Chan *c, void *buf, long n, vlong off)
if(nr > nreq)
nr = nreq;
if(type == Tread)
r->b = bl2mem((uchar*)uba, r->b, nr);
nr = readblist(r->b, (uchar*)uba, nr, 0);
mntfree(r);
poperror();
@ -1076,13 +1076,8 @@ doread(Mnt *m, int len)
while(qlen(m->q) < len){
b = devtab[m->c->type]->bread(m->c, m->msize, 0);
if(b == nil)
if(b == nil || qaddlist(m->q, b) == 0)
return -1;
if(blocklen(b) == 0){
freeblist(b);
return -1;
}
qaddlist(m->q, b);
}
return 0;
}

View file

@ -10,7 +10,6 @@ Block* allocb(int);
int anyhigher(void);
int anyready(void);
Image* attachimage(int, Chan*, uintptr, ulong);
Block* bl2mem(uchar*, Block*, int);
int blocklen(Block*);
void bootlinks(void);
void cachedel(Image*, uintptr);
@ -250,7 +249,7 @@ void putseg(Segment*);
void putstrn(char*, int);
void putswap(Page*);
ulong pwait(Waitmsg*);
void qaddlist(Queue*, Block*);
int qaddlist(Queue*, Block*);
Block* qbread(Queue*, int);
long qbwrite(Queue*, Block*);
Queue* qbypass(void (*)(void*, Block*), void*);

View file

@ -78,6 +78,9 @@ padblock(Block *bp, int size)
int n;
Block *nbp;
if(bp->next != nil)
panic("padblock %#p", getcallerpc(&bp));
QDEBUG checkb(bp, "padblock 1");
if(size >= 0){
if(bp->rp - bp->base >= size){
@ -85,33 +88,25 @@ padblock(Block *bp, int size)
return bp;
}
if(bp->next != nil)
panic("padblock %#p", getcallerpc(&bp));
n = BLEN(bp);
padblockcnt++;
nbp = allocb(size+n);
nbp->rp += size;
nbp->wp = nbp->rp;
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
freeb(bp);
nbp->rp -= size;
} else {
size = -size;
if(bp->next != nil)
panic("padblock %#p", getcallerpc(&bp));
if(bp->lim - bp->wp >= size)
return bp;
n = BLEN(bp);
padblockcnt++;
nbp = allocb(size+n);
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
freeb(bp);
}
freeb(bp);
padblockcnt++;
QDEBUG checkb(nbp, "padblock 1");
return nbp;
}
@ -155,20 +150,21 @@ blockalloclen(Block *bp)
Block*
concatblock(Block *bp)
{
Block *nb, *next;
int len;
Block *nb, *f;
if(bp->next == nil)
return bp;
nb = allocb(blocklen(bp));
for(f = bp; f != nil; f = f->next) {
len = BLEN(f);
memmove(nb->wp, f->rp, len);
for(; bp != nil; bp = next) {
next = bp->next;
len = BLEN(bp);
memmove(nb->wp, bp->rp, len);
nb->wp += len;
freeb(bp);
}
concatblockcnt += BLEN(nb);
freeblist(bp);
QDEBUG checkb(nb, "concatblock 1");
return nb;
}
@ -179,8 +175,8 @@ concatblock(Block *bp)
Block*
pullupblock(Block *bp, int n)
{
int i;
Block *nbp;
int i;
/*
* this should almost always be true, it's
@ -204,10 +200,10 @@ pullupblock(Block *bp, int n)
*/
n -= BLEN(bp);
while((nbp = bp->next) != nil){
pullupblockcnt++;
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
pullupblockcnt++;
bp->wp += n;
nbp->rp += n;
QDEBUG checkb(bp, "pullupblock 1");
@ -220,7 +216,6 @@ pullupblock(Block *bp, int n)
i = 0;
}
memmove(bp->wp, nbp->rp, i);
pullupblockcnt++;
bp->wp += i;
bp->next = nbp->next;
nbp->next = nil;
@ -403,11 +398,11 @@ qget(Queue *q)
iunlock(q);
return nil;
}
QDEBUG checkb(b, "qget");
q->bfirst = b->next;
b->next = nil;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
QDEBUG checkb(b, "qget");
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->len < q->limit/2){
@ -430,7 +425,7 @@ qget(Queue *q)
int
qdiscard(Queue *q, int len)
{
Block *b;
Block *b, *tofree = nil;
int dowakeup, n, sofar;
ilock(q);
@ -442,10 +437,12 @@ qdiscard(Queue *q, int len)
n = BLEN(b);
if(n <= len - sofar){
q->bfirst = b->next;
b->next = nil;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
freeb(b);
/* remember to free this */
b->next = tofree;
tofree = b;
} else {
n = len - sofar;
b->rp += n;
@ -474,6 +471,9 @@ qdiscard(Queue *q, int len)
if(dowakeup)
wakeup(&q->wr);
if(tofree != nil)
freeblist(tofree);
return sofar;
}
@ -483,10 +483,9 @@ qdiscard(Queue *q, int len)
int
qconsume(Queue *q, void *vp, int len)
{
Block *b;
Block *b, *tofree = nil;
int n, dowakeup;
uchar *p = vp;
Block *tofree = nil;
/* sync with qwrite */
ilock(q);
@ -495,8 +494,8 @@ qconsume(Queue *q, void *vp, int len)
b = q->bfirst;
if(b == nil){
q->state |= Qstarve;
iunlock(q);
return -1;
len = -1;
goto out;
}
QDEBUG checkb(b, "qconsume 1");
@ -511,17 +510,16 @@ qconsume(Queue *q, void *vp, int len)
tofree = b;
};
consumecnt += n;
if(n < len)
len = n;
memmove(p, b->rp, len);
consumecnt += n;
b->rp += len;
q->dlen -= len;
/* discard the block if we're done with it */
if((q->state & Qmsg) || len == n){
q->bfirst = b->next;
b->next = nil;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
@ -530,6 +528,7 @@ qconsume(Queue *q, void *vp, int len)
tofree = b;
}
out:
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->len < q->limit/2){
q->state &= ~Qflow;
@ -551,40 +550,23 @@ qconsume(Queue *q, void *vp, int len)
int
qpass(Queue *q, Block *b)
{
int dlen, len, dowakeup;
int len, dowakeup;
/* sync with qread */
dowakeup = 0;
ilock(q);
if(q->len >= q->limit){
freeblist(b);
iunlock(q);
freeblist(b);
return -1;
}
if(q->state & Qclosed){
len = BALLOC(b);
freeblist(b);
iunlock(q);
return len;
freeblist(b);
return 0;
}
/* add buffer to queue */
if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
len = BALLOC(b);
dlen = BLEN(b);
QDEBUG checkb(b, "qpass");
while(b->next != nil){
b = b->next;
QDEBUG checkb(b, "qpass");
len += BALLOC(b);
dlen += BLEN(b);
}
q->blast = b;
q->len += len;
q->dlen += dlen;
len = qaddlist(q, b);
if(q->len >= q->limit/2)
q->state |= Qflow;
@ -604,35 +586,19 @@ qpass(Queue *q, Block *b)
int
qpassnolim(Queue *q, Block *b)
{
int dlen, len, dowakeup;
int len, dowakeup;
/* sync with qread */
dowakeup = 0;
ilock(q);
if(q->state & Qclosed){
freeblist(b);
iunlock(q);
return BALLOC(b);
freeblist(b);
return 0;
}
/* add buffer to queue */
if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
len = BALLOC(b);
dlen = BLEN(b);
QDEBUG checkb(b, "qpass");
while(b->next != nil){
b = b->next;
QDEBUG checkb(b, "qpass");
len += BALLOC(b);
dlen += BLEN(b);
}
q->blast = b;
q->len += len;
q->dlen += dlen;
len = qaddlist(q, b);
if(q->len >= q->limit/2)
q->state |= Qflow;
@ -680,6 +646,10 @@ qproduce(Queue *q, void *vp, int len)
int dowakeup;
uchar *p = vp;
b = iallocb(len);
if(b == nil)
return 0;
/* sync with qread */
dowakeup = 0;
ilock(q);
@ -690,25 +660,12 @@ qproduce(Queue *q, void *vp, int len)
iunlock(q);
return -1;
}
producecnt += len;
/* save in buffer */
b = iallocb(len);
if(b == nil){
iunlock(q);
return 0;
}
memmove(b->wp, p, len);
producecnt += len;
b->wp += len;
if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
/* b->next = 0; done by iallocb() */
q->len += BALLOC(b);
q->dlen += BLEN(b);
QDEBUG checkb(b, "qproduce");
qaddlist(q, b);
if(q->state & Qstarve){
q->state &= ~Qstarve;
@ -731,49 +688,13 @@ qproduce(Queue *q, void *vp, int len)
Block*
qcopy(Queue *q, int len, ulong offset)
{
int sofar;
int n;
Block *b, *nb;
uchar *p;
nb = allocb(len);
Block *b;
b = allocb(len);
ilock(q);
/* go to offset */
b = q->bfirst;
for(sofar = 0; ; sofar += n){
if(b == nil){
iunlock(q);
return nb;
}
n = BLEN(b);
if(sofar + n > offset){
p = b->rp + offset - sofar;
n -= offset - sofar;
break;
}
QDEBUG checkb(b, "qcopy");
b = b->next;
}
/* copy bytes from there */
for(sofar = 0; sofar < len;){
if(n > len - sofar)
n = len - sofar;
memmove(nb->wp, p, n);
qcopycnt += n;
sofar += n;
nb->wp += n;
b = b->next;
if(b == nil)
break;
n = BLEN(b);
p = b->rp;
}
b->wp += readblist(q->bfirst, b->wp, len, offset);
iunlock(q);
return nb;
return b;
}
/*
@ -855,21 +776,34 @@ qwait(Queue *q)
}
/*
* add a block list to a queue
* add a block list to a queue, return bytes added
*/
void
int
qaddlist(Queue *q, Block *b)
{
int len, dlen;
QDEBUG checkb(b, "qaddlist 1");
/* queue the block */
if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
q->len += blockalloclen(b);
q->dlen += blocklen(b);
while(b->next != nil)
len = BALLOC(b);
dlen = BLEN(b);
while(b->next != nil){
b = b->next;
QDEBUG checkb(b, "qaddlist 2");
len += BALLOC(b);
dlen += BLEN(b);
}
q->blast = b;
q->len += len;
q->dlen += dlen;
return dlen;
}
/*
@ -883,11 +817,11 @@ qremove(Queue *q)
b = q->bfirst;
if(b == nil)
return nil;
QDEBUG checkb(b, "qremove");
q->bfirst = b->next;
b->next = nil;
q->dlen -= BLEN(b);
q->len -= BALLOC(b);
QDEBUG checkb(b, "qremove");
return b;
}
@ -921,68 +855,6 @@ readblist(Block *b, uchar *p, long n, long o)
return r;
}
/*
* copy the contents of a string of blocks into
* memory. emptied blocks are freed. return
* pointer to first unconsumed block.
*/
Block*
bl2mem(uchar *p, Block *b, int n)
{
int i;
Block *next;
for(; b != nil; b = next){
i = BLEN(b);
if(i > n){
memmove(p, b->rp, n);
b->rp += n;
return b;
}
memmove(p, b->rp, i);
n -= i;
p += i;
b->rp += i;
next = b->next;
freeb(b);
}
return nil;
}
/*
* copy the contents of memory into a string of blocks.
* return nil on error.
*/
Block*
mem2bl(uchar *p, int len)
{
int n;
Block *b, *first, **l;
first = nil;
l = &first;
if(waserror()){
freeblist(first);
nexterror();
}
do {
n = len;
if(n > Maxatomic)
n = Maxatomic;
*l = b = allocb(n);
setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
memmove(b->wp, p, n);
b->wp += n;
p += n;
len -= n;
l = &b->next;
} while(len > 0);
poperror();
return first;
}
/*
* put a block back to the front of the queue
* called with q ilocked
@ -998,6 +870,35 @@ qputback(Queue *q, Block *b)
q->dlen += BLEN(b);
}
/*
* cut off n bytes from the end of *h. return a new
* block with the tail and change *h to refer to the
* head.
*/
static Block*
splitblock(Block **h, int n)
{
Block *a, *b;
int m;
a = *h;
m = BLEN(a) - n;
if(m < n){
b = allocb(m);
memmove(b->wp, a->rp, m);
b->wp += m;
a->rp += m;
*h = b;
return a;
} else {
b = allocb(n);
a->wp -= n;
memmove(b->wp, a->wp, n);
b->wp += n;
return b;
}
}
/*
* flow control, get producer going again
* called with q ilocked
@ -1029,7 +930,7 @@ qwakeup_iunlock(Queue *q)
Block*
qbread(Queue *q, int len)
{
Block *b, *nb;
Block *b;
int n;
eqlock(&q->rlock);
@ -1057,24 +958,21 @@ qbread(Queue *q, int len)
n = BLEN(b);
/* split block if it's too big and this is not a message queue */
nb = b;
if(n > len){
if((q->state&Qmsg) == 0){
n -= len;
b = allocb(n);
memmove(b->wp, nb->rp+len, n);
b->wp += n;
qputback(q, b);
}
nb->wp = nb->rp + len;
n -= len;
if((q->state & Qmsg) == 0)
qputback(q, splitblock(&b, n));
else
b->wp -= n;
}
/* restart producer */
qwakeup_iunlock(q);
poperror();
qunlock(&q->rlock);
return nb;
poperror();
return b;
}
/*
@ -1084,7 +982,7 @@ qbread(Queue *q, int len)
long
qread(Queue *q, void *vp, int len)
{
Block *b, *first, **l;
Block *b, *first, **last;
int m, n;
eqlock(&q->rlock);
@ -1109,6 +1007,7 @@ again:
}
/* if we get here, there's at least one block in the queue */
last = &first;
if(q->state & Qcoalesce){
/* when coalescing, 0 length blocks just go away */
b = q->bfirst;
@ -1123,13 +1022,13 @@ again:
* fit in the read.
*/
n = 0;
l = &first;
for(;;) {
*l = qremove(q);
l = &b->next;
*last = qremove(q);
n += m;
if(n >= len || (b = q->bfirst) == nil)
if(n >= len || q->bfirst == nil)
break;
last = &b->next;
b = q->bfirst;
m = BLEN(b);
}
} else {
@ -1137,25 +1036,24 @@ again:
n = BLEN(first);
}
/* copy to user space outside of the ilock */
iunlock(q);
b = bl2mem(vp, first, len);
ilock(q);
/* take care of any left over partial block */
if(b != nil){
n -= BLEN(b);
if(q->state & Qmsg)
freeb(b);
else
qputback(q, b);
}
/* split last block if it's too big and this is not a message queue */
if(n > len && (q->state & Qmsg) == 0)
qputback(q, splitblock(last, n - len));
/* restart producer */
qwakeup_iunlock(q);
poperror();
qunlock(&q->rlock);
poperror();
if(waserror()){
freeblist(first);
nexterror();
}
n = readblist(first, vp, len, 0);
freeblist(first);
poperror();
return n;
}
@ -1198,19 +1096,18 @@ qflow(Queue *q)
long
qbwrite(Queue *q, Block *b)
{
int n, dowakeup;
int len, dowakeup;
Proc *p;
n = BLEN(b);
if(q->bypass != nil){
len = blocklen(b);
(*q->bypass)(q->arg, b);
return n;
return len;
}
dowakeup = 0;
if(waserror()){
freeb(b);
freeblist(b);
nexterror();
}
ilock(q);
@ -1224,21 +1121,13 @@ qbwrite(Queue *q, Block *b)
/* don't queue over the limit */
if(q->len >= q->limit && q->noblock){
iunlock(q);
freeb(b);
poperror();
return n;
len = blocklen(b);
freeblist(b);
return len;
}
/* queue the block */
if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
b->next = nil;
q->len += BALLOC(b);
q->dlen += n;
QDEBUG checkb(b, "qbwrite");
len = qaddlist(q, b);
/* make sure other end gets awakened */
if(q->state & Qstarve){
@ -1270,7 +1159,7 @@ qbwrite(Queue *q, Block *b)
*/
qflow(q);
return n;
return len;
}
/*
@ -1359,14 +1248,7 @@ qiwrite(Queue *q, void *vp, int len)
break;
}
QDEBUG checkb(b, "qiwrite");
if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
q->len += BALLOC(b);
q->dlen += n;
qaddlist(q, b);
if(q->state & Qstarve){
q->state &= ~Qstarve;