plan9fox/sys/src/ape/lib/ap/plan9/_buf.c

521 lines
10 KiB
C
Raw Normal View History

#define _BSDTIME_EXTENSION
#define _LOCK_EXTENSION
#include "lib.h"
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <lock.h>
#include <sys/time.h>
#include <sys/select.h>
#include <unistd.h>
#include "sys9.h"
typedef struct Muxseg {
Lock lock; /* for mutual exclusion access to buffer variables */
int curfds; /* number of fds currently buffered */
int selwait; /* true if selecting process is waiting */
int waittime; /* time for timer process to wait */
fd_set rwant; /* fd's that select wants to read */
fd_set ewant; /* fd's that select wants to know eof info on */
Muxbuf bufs[OPEN_MAX];
} Muxseg;
static Muxseg *mux = 0; /* shared memory segment */
/* _muxsid and _killmuxsid are known in libbsd's listen.c */
int _muxsid = -1; /* group id of copy processes */
static int _mainpid = -1;
static int timerpid = -1; /* pid of a timer process */
void _killmuxsid(void);
static void _copyproc(int, Muxbuf*);
static void _timerproc(void);
static void _resettimer(void);
static int copynotehandler(void *, char *);
/* assume FD_SETSIZE is 96 */
#define FD_ANYSET(p) ((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
/*
* Start making fd read-buffered: make the shared segment, if necessary,
* allocate a slot (index into mux->bufs), and fork a child to read the fd
* and write into the slot-indexed buffer.
* Return -1 if we can't do it.
*/
int
_startbuf(int fd)
{
2013-05-07 00:51:18 +00:00
int i, pid;
Fdinfo *f;
Muxbuf *b;
void *v;
Muxseg *m;
if(mux == 0){
if(_RFORK(RFREND) == -1){
_syserrno();
return -1;
}
m = (Muxseg*)_SEGATTACH(0, "shared", 0, sizeof(Muxseg));
if(m == (void*)-1){
_syserrno();
return -1;
}
mux = m;
/* segattach has returned zeroed memory */
atexit(_killmuxsid);
}
if(fd < 0)
return 0;
lock(&mux->lock);
f = &_fdinfo[fd];
if((f->flags&FD_ISOPEN) == 0){
unlock(&mux->lock);
errno = EBADF;
return -1;
}
if((f->flags&FD_BUFFERED) != 0){
unlock(&mux->lock);
return 0;
}
if((f->flags&FD_BUFFEREDX) != 0){
unlock(&mux->lock);
errno = EIO;
return -1;
}
for(b = mux->bufs; b < &mux->bufs[mux->curfds]; b++)
if(b->fd == -1)
goto Found;
if(mux->curfds >= OPEN_MAX){
unlock(&mux->lock);
errno = ENFILE;
return -1;
}
mux->curfds++;
Found:
b->n = 0;
b->putnext = b->data;
b->getnext = b->data;
b->eof = 0;
b->fd = fd;
if(_mainpid == -1)
_mainpid = getpid();
if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
/* copy process ... */
if(_muxsid == -1) {
_RFORK(RFNOTEG);
_muxsid = getpgrp();
} else
setpgid(getpid(), _muxsid);
_NOTIFY(copynotehandler);
for(i=0; i<OPEN_MAX; i++)
if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
_CLOSE(i);
while(_RENDEZVOUS(&b->copypid, (void*)_muxsid) == (void*)~0)
;
_copyproc(fd, b);
}
/* parent process continues ... */
b->copypid = pid;
f->buf = b;
f->flags |= FD_BUFFERED;
unlock(&mux->lock);
while((v = _RENDEZVOUS(&b->copypid, 0)) == (void*)~0)
;
_muxsid = (uintptr_t)v;
/* leave fd open in parent so system doesn't reuse it */
return 0;
}
/*
* The given buffered fd is being closed.
* Set the fd field in the shared buffer to -1 to tell copyproc
* to exit, and kill the copyproc.
*/
void
_closebuf(int fd)
{
Muxbuf *b;
int i;
b = _fdinfo[fd].buf;
if(b == 0 || mux == 0)
return;
lock(&mux->lock);
if(b->fd == fd){
b->fd = -1;
for(i=0; i<10 && kill(b->copypid, SIGKILL)==0; i++)
_SLEEP(1);
}
unlock(&mux->lock);
}
/* child copy procs execute this until eof */
static void
_copyproc(int fd, Muxbuf *b)
{
unsigned char *e;
int n;
int nzeros;
e = &b->data[PERFDMAX];
for(;;) {
/* make sure there's room */
lock(&mux->lock);
if(b->fd == fd && (e - b->putnext) < READMAX) {
if(b->getnext == b->putnext) {
b->getnext = b->putnext = b->data;
unlock(&mux->lock);
} else {
/* sleep until there's room */
b->roomwait = 1;
unlock(&mux->lock);
_RENDEZVOUS(&b->roomwait, 0);
}
} else
unlock(&mux->lock);
/*
* A Zero-length _READ might mean a zero-length write
* happened, or it might mean eof; try several times to
* disambiguate (posix read() discards 0-length messages)
*/
n = 0;
nzeros = 0;
do {
if(b->fd != fd)
break;
n = _READ(fd, b->putnext, READMAX);
} while(b->fd == fd && n == 0 && ++nzeros < 3);
lock(&mux->lock);
if(b->fd != fd){
unlock(&mux->lock);
_exit(0); /* we've been closed */
}
if(n <= 0) {
b->eof = 1;
if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
mux->selwait = 0;
unlock(&mux->lock);
_RENDEZVOUS(&mux->selwait, (void*)fd);
} else if(b->datawait) {
b->datawait = 0;
unlock(&mux->lock);
_RENDEZVOUS(&b->datawait, 0);
} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
mux->selwait = 0;
unlock(&mux->lock);
_RENDEZVOUS(&mux->selwait, (void*)fd);
} else
unlock(&mux->lock);
_exit(0);
} else {
b->putnext += n;
b->n += n;
if(b->n > 0) {
/* parent process cannot be both in datawait and selwait */
if(b->datawait) {
b->datawait = 0;
unlock(&mux->lock);
/* wake up _bufreading process */
_RENDEZVOUS(&b->datawait, 0);
} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
mux->selwait = 0;
unlock(&mux->lock);
/* wake up selecting process */
_RENDEZVOUS(&mux->selwait, (void*)fd);
} else
unlock(&mux->lock);
} else
unlock(&mux->lock);
}
}
}
/* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
int
_readbuf(int fd, void *addr, int nwant, int noblock)
{
Muxbuf *b;
int ngot;
b = _fdinfo[fd].buf;
if(b == nil || b->fd != fd){
badfd:
errno = EBADF;
return -1;
}
if(b->eof && b->n == 0) {
goteof:
return 0;
}
if(b->n == 0 && noblock) {
errno = EAGAIN;
return -1;
}
lock(&mux->lock);
if(b->fd != fd){
unlock(&mux->lock);
goto badfd;
}
/* make sure there's data */
ngot = b->putnext - b->getnext;
if(ngot == 0) {
/* maybe EOF just happened */
if(b->eof) {
unlock(&mux->lock);
goto goteof;
}
/* sleep until there's data */
b->datawait = 1;
unlock(&mux->lock);
_RENDEZVOUS(&b->datawait, 0);
lock(&mux->lock);
if(b->fd != fd){
unlock(&mux->lock);
goto badfd;
}
ngot = b->putnext - b->getnext;
}
if(ngot == 0) {
unlock(&mux->lock);
goto goteof;
}
if(ngot > nwant)
ngot = nwant;
memcpy(addr, b->getnext, ngot);
b->getnext += ngot;
b->n -= ngot;
if(b->getnext == b->putnext && b->roomwait) {
b->getnext = b->putnext = b->data;
b->roomwait = 0;
unlock(&mux->lock);
/* wake up copy process */
_RENDEZVOUS(&b->roomwait, 0);
} else
unlock(&mux->lock);
return ngot;
}
int
select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
{
int n, i, t, slots, fd, err;
Fdinfo *f;
Muxbuf *b;
if(timeout)
t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
else
t = -1;
if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
|| (efds && FD_ANYSET(efds)))) {
/* no requested fds */
if(t > 0)
_SLEEP(t);
return 0;
}
if(_startbuf(-1) != 0)
return -1;
/* make sure all requested rfds and efds are buffered */
if(nfds >= OPEN_MAX)
nfds = OPEN_MAX;
for(i = 0; i < nfds; i++)
if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
f = &_fdinfo[i];
if((f->flags&FD_ISOPEN) == 0){
errno = EBADF;
return -1;
}
if((f->flags&FD_BUFFERED) == 0)
if(_startbuf(i) != 0)
return -1;
}
/* check wfds; for now, we'll say they are all ready */
n = 0;
if(wfds && FD_ANYSET(wfds)){
for(i = 0; i<nfds; i++)
if(FD_ISSET(i, wfds)) {
f = &_fdinfo[i];
if((f->flags&FD_ISOPEN) == 0){
errno = EBADF;
return -1;
}
n++;
}
}
lock(&mux->lock);
slots = mux->curfds;
FD_ZERO(&mux->rwant);
FD_ZERO(&mux->ewant);
for(i = 0; i<slots; i++) {
b = &mux->bufs[i];
fd = b->fd;
if(fd == -1)
continue;
err = 0;
if(efds && FD_ISSET(fd, efds)) {
if(b->eof && b->n == 0){
err = 1;
n++;
}else{
FD_CLR(fd, efds);
FD_SET(fd, &mux->ewant);
}
}
if(rfds && FD_ISSET(fd, rfds)) {
if(!err && (b->n > 0 || b->eof))
n++;
else{
FD_CLR(fd, rfds);
FD_SET(fd, &mux->rwant);
}
}
}
if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
FD_ZERO(&mux->rwant);
FD_ZERO(&mux->ewant);
unlock(&mux->lock);
return n;
}
if(timeout) {
mux->waittime = t;
if(timerpid == -1)
_timerproc();
else
_resettimer();
}
mux->selwait = 1;
unlock(&mux->lock);
fd = (int)(uintptr_t)_RENDEZVOUS(&mux->selwait, 0);
if(fd >= 0 && fd < nfds) {
b = _fdinfo[fd].buf;
if(b == 0 || b->fd != fd) {
} else if(FD_ISSET(fd, &mux->rwant)) {
FD_SET(fd, rfds);
n = 1;
} else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
FD_SET(fd, efds);
n = 1;
}
}
return n;
}
static int timerreset;
static int timerpid;
static void
alarmed(int)
{
timerreset = 1;
}
/* a little over an hour */
#define LONGWAIT 4000001
static void
_killtimerproc(void)
{
if(timerpid > 0)
kill(timerpid, SIGKILL);
}
static void
_timerproc(void)
{
int i;
if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
/* timer process */
setpgid(getpid(), _muxsid);
signal(SIGALRM, alarmed);
for(i=0; i<OPEN_MAX; i++)
_CLOSE(i);
while(_RENDEZVOUS(&timerpid, 0) == (void*)~0)
;
for(;;) {
_SLEEP(mux->waittime);
if(timerreset) {
timerreset = 0;
} else {
lock(&mux->lock);
if(mux->selwait && mux->waittime != LONGWAIT) {
mux->selwait = 0;
mux->waittime = LONGWAIT;
unlock(&mux->lock);
_RENDEZVOUS(&mux->selwait, (void*)-2);
} else {
mux->waittime = LONGWAIT;
unlock(&mux->lock);
}
}
}
}
/* parent process continues */
if(timerpid > 0){
atexit(_killtimerproc);
while(_RENDEZVOUS(&timerpid, 0) == (void*)~0)
;
}
}
static void
_resettimer(void)
{
kill(timerpid, SIGALRM);
}
void
_killmuxsid(void)
{
if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
kill(-_muxsid,SIGTERM);
}
/* call this on fork(), because reading a BUFFERED fd won't work in child */
void
_detachbuf(void)
{
int i;
Fdinfo *f;
if(mux == 0)
return;
_SEGDETACH(mux);
for(i = 0; i < OPEN_MAX; i++){
f = &_fdinfo[i];
if(f->flags&FD_BUFFERED)
f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
/* mark 'poisoned' */
}
mux = 0;
_muxsid = -1;
_mainpid = -1;
timerpid = -1;
}
static int
copynotehandler(void *, char *)
{
if(_finishing)
_finish(0, 0);
_NOTED(1);
2012-07-30 17:11:16 +00:00
return 0;
}