diff --git a/sys/src/9/port/qio.c b/sys/src/9/port/qio.c index a70fa8a31..8076b04b8 100644 --- a/sys/src/9/port/qio.c +++ b/sys/src/9/port/qio.c @@ -1152,6 +1152,31 @@ qnotfull(void *a) return q->len < q->limit || (q->state & Qclosed); } +/* + * flow control, wait for queue to get below the limit + */ +static void +qflow(Queue *q) +{ + for(;;){ + if(q->noblock || qnotfull(q)) + break; + + ilock(q); + q->state |= Qflow; + iunlock(q); + + eqlock(&q->wlock); + if(waserror()){ + qunlock(&q->wlock); + nexterror(); + } + sleep(&q->wr, qnotfull, q); + qunlock(&q->wlock); + poperror(); + } +} + /* * add a block to a queue obeying flow control */ @@ -1182,17 +1207,11 @@ qbwrite(Queue *q, Block *b) } /* don't queue over the limit */ - if(q->len >= q->limit){ - if(q->noblock){ - iunlock(q); - freeb(b); - poperror(); - return n; - } - if(q->len >= q->limit*10){ - iunlock(q); - error(Egreg); - } + if(q->len >= q->limit && q->noblock){ + iunlock(q); + freeb(b); + poperror(); + return n; } /* queue the block */ @@ -1228,34 +1247,13 @@ qbwrite(Queue *q, Block *b) } /* - * flow control, wait for queue to get below the limit - * before allowing the process to continue and queue - * more. We do this here so that postnote can only - * interrupt us after the data has been queued. This - * means that things like 9p flushes and ssl messages - * will not be disrupted by software interrupts. - * - * Note - this is moderately dangerous since a process - * that keeps getting interrupted and rewriting will - * queue up to 10 times the queue limit before failing. + * flow control, before allowing the process to continue and + * queue more. We do this here so that postnote can only + * interrupt us after the data has been queued. This means that + * things like 9p flushes and ssl messages will not be disrupted + * by software interrupts. */ - for(;;){ - if(q->noblock || qnotfull(q)) - break; - - ilock(q); - q->state |= Qflow; - iunlock(q); - - eqlock(&q->wlock); - if(waserror()){ - qunlock(&q->wlock); - nexterror(); - } - sleep(&q->wr, qnotfull, q); - qunlock(&q->wlock); - poperror(); - } + qflow(q); return n; } @@ -1273,6 +1271,16 @@ qwrite(Queue *q, void *vp, int len) QDEBUG if(!islo()) print("qwrite hi %#p\n", getcallerpc(&q)); + /* stop queue bloat before allocating blocks */ + if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){ + while(waserror()){ + if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig) + error(Egreg); + } + qflow(q); + poperror(); + } + sofar = 0; do { n = len-sofar;