libthread: reimplemented i/o procs using new interrupt ctl message

This commit is contained in:
cinap_lenrek 2011-08-22 03:03:27 +02:00
parent 9a90e50142
commit 08c39320a4
6 changed files with 114 additions and 89 deletions

View file

@ -3,6 +3,7 @@
closeioproc, closeioproc,
iocall, iocall,
ioclose, ioclose,
ioflush,
iointerrupt, iointerrupt,
iodial, iodial,
ioopen, ioopen,
@ -33,6 +34,7 @@ long ioreadn(Ioproc *io, int fd, void *a, long n);
long iowrite(Ioproc *io, int fd, void *a, long n); long iowrite(Ioproc *io, int fd, void *a, long n);
int iodial(Ioproc *io, char *addr, char *local, char *dir, char *cdfp); int iodial(Ioproc *io, char *addr, char *local, char *dir, char *cdfp);
.XX .XX
int ioflush(Ioproc *io);
void iointerrupt(Ioproc *io); void iointerrupt(Ioproc *io);
void closeioproc(Ioproc *io); void closeioproc(Ioproc *io);
.XX .XX
@ -74,14 +76,16 @@ and
.IR dial (2)) .IR dial (2))
in the slave process associated with in the slave process associated with
.IR io . .IR io .
It is an error to execute more than one call
at a time in an I/O proc.
.PP .PP
.I Iointerrupt .I Iointerrupt
interrupts the call currently executing in the I/O proc. interrupts the next or currently executing call in the I/O proc. If
If no call is executing, there was no call executing, the interrupt will stay pending and the
next I/O call will get interrupted.
.PP
.I Ioflush
executes a non-op in the I/O proc. It is commonly called after
.IR iointerrupt .IR iointerrupt
is a no-op. to clear a pending interrupt.
.PP .PP
.I Closeioproc .I Closeioproc
terminates the I/O proc and frees the associated terminates the I/O proc and frees the associated

View file

@ -6,47 +6,23 @@
long long
iocall(Ioproc *io, long (*op)(va_list*), ...) iocall(Ioproc *io, long (*op)(va_list*), ...)
{ {
int ret, inted; Iocall r;
Ioproc *msg;
if(send(io->c, &io) == -1){ r.op = op;
va_start(r.arg, op);
if(sendp(io->c, &r) < 0){
werrstr("interrupted"); werrstr("interrupted");
return -1; return -1;
} }
assert(!io->inuse); while(recv(io->creply, nil) < 0){
io->inuse = 1; if(canqlock(io)){
io->op = op; if(++io->intr == 1)
va_start(io->arg, op); write(io->ctl, "interrupt", 9);
msg = io; qunlock(io);
inted = 0; }
while(send(io->creply, &msg) == -1){
msg = nil;
inted = 1;
} }
if(inted){ va_end(r.arg);
werrstr("interrupted"); if(r.ret < 0)
return -1; errstr(r.err, sizeof r.err);
} return r.ret;
/*
* If we get interrupted, we have to stick around so that
* the IO proc has someone to talk to. Send it an interrupt
* and try again.
*/
inted = 0;
while(recv(io->creply, nil) == -1){
inted = 1;
iointerrupt(io);
}
USED(inted);
va_end(io->arg);
ret = io->ret;
if(ret < 0)
errstr(io->err, sizeof io->err);
io->inuse = 0;
/* release resources */
while(send(io->creply, &io) == -1)
;
return ret;
} }

View file

@ -0,0 +1,16 @@
#include <u.h>
#include <libc.h>
#include <thread.h>
#include "threadimpl.h"
long
_ioflush(va_list *)
{
return 0;
}
int
ioflush(Ioproc *io)
{
return iocall(io, _ioflush);
}

View file

@ -11,55 +11,80 @@ enum
void void
iointerrupt(Ioproc *io) iointerrupt(Ioproc *io)
{ {
if(!io->inuse) qlock(io);
return; if(++io->intr == 1)
threadint(io->tid); write(io->ctl, "interrupt", 9);
qunlock(io);
} }
static void static void
xioproc(void *a) xioproc(void *a)
{ {
Ioproc *io, *x; Channel *c;
io = a; Ioproc *io;
/* Iocall *r;
* first recvp acquires the ioproc.
* second tells us that the data is ready.
*/
for(;;){
while(recv(io->c, &x) == -1)
;
if(x == 0) /* our cue to leave */
break;
assert(x == io);
/* caller is now committed -- even if interrupted he'll return */ c = a;
while(recv(io->creply, &x) == -1) if(io = mallocz(sizeof(*io), 1)){
; char buf[128];
if(x == 0) /* caller backed out */
continue;
assert(x == io);
io->ret = io->op(&io->arg); snprint(buf, sizeof(buf), "/proc/%d/ctl", getpid());
if(io->ret < 0) if((io->ctl = open(buf, OWRITE)) < 0){
rerrstr(io->err, sizeof io->err); free(io);
while(send(io->creply, &io) == -1) io = nil;
; } else {
while(recv(io->creply, &x) == -1) if((io->creply = chancreate(sizeof(void*), 0)) == nil){
; close(io->ctl);
free(io);
io = nil;
} else
io->c = c;
}
} }
while(send(c, &io) < 0)
;
if(io == nil)
return;
for(;;){
while(recv(io->c, &r) < 0)
;
if(r == 0)
break;
if(io->intr){
r->ret = -1;
strcpy(r->err, "interrupted");
} else if((r->ret = r->op(&r->arg)) < 0)
rerrstr(r->err, sizeof r->err);
qlock(io);
if(io->intr){
io->intr = 0;
write(io->ctl, "nointerrupt", 11);
}
while(send(io->creply, &r) < 0)
;
qunlock(io);
}
close(io->ctl);
chanfree(io->c);
chanfree(io->creply);
free(io);
} }
Ioproc* Ioproc*
ioproc(void) ioproc(void)
{ {
Channel *c;
Ioproc *io; Ioproc *io;
io = mallocz(sizeof(*io), 1); if((c = chancreate(sizeof(void*), 0)) == nil)
sysfatal("ioproc chancreate");
proccreate(xioproc, c, STACK);
while(recv(c, &io) < 0)
;
if(io == nil) if(io == nil)
sysfatal("ioproc malloc: %r"); sysfatal("ioproc alloc");
io->c = chancreate(sizeof(void*), 0);
io->creply = chancreate(sizeof(void*), 0);
io->tid = proccreate(xioproc, io, STACK);
return io; return io;
} }
@ -69,9 +94,6 @@ closeioproc(Ioproc *io)
if(io == nil) if(io == nil)
return; return;
iointerrupt(io); iointerrupt(io);
while(send(io->c, 0) == -1) while(sendp(io->c, nil) < 0)
; ;
chanfree(io->c);
chanfree(io->creply);
free(io);
} }

View file

@ -21,6 +21,7 @@ OFILES=\
ioreadn.$O\ ioreadn.$O\
iosleep.$O\ iosleep.$O\
iowrite.$O\ iowrite.$O\
ioflush.$O\
kill.$O\ kill.$O\
lib.$O\ lib.$O\
main.$O\ main.$O\

View file

@ -25,6 +25,7 @@ typedef struct Tqueue Tqueue;
typedef struct Thread Thread; typedef struct Thread Thread;
typedef struct Execargs Execargs; typedef struct Execargs Execargs;
typedef struct Proc Proc; typedef struct Proc Proc;
typedef struct Iocall Iocall;
/* must match list in sched.c */ /* must match list in sched.c */
typedef enum typedef enum
@ -135,7 +136,8 @@ struct Proc
char threadint; /* tag for threadexitsall() */ char threadint; /* tag for threadexitsall() */
}; };
struct Pqueue { /* Proc queue */ struct Pqueue /* Proc queue */
{
Lock lock; Lock lock;
Proc *head; Proc *head;
Proc **tail; Proc **tail;
@ -143,14 +145,18 @@ struct Pqueue { /* Proc queue */
struct Ioproc struct Ioproc
{ {
int tid; QLock;
Channel *c, *creply; int intr;
int inuse; int ctl;
long (*op)(va_list*); Channel *c, *creply;
va_list arg; };
long ret;
char err[ERRMAX]; struct Iocall
Ioproc *next; {
long (*op)(va_list*);
va_list arg;
long ret;
char err[ERRMAX];
}; };
void _freeproc(Proc*); void _freeproc(Proc*);