aan: use unsigned message counters, reject repeated acks, cleanup debug prints

This commit is contained in:
cinap_lenrek 2017-02-04 01:39:36 +01:00
parent 930be3d317
commit 7f12431009

View file

@ -36,7 +36,8 @@ static Channel *unsent;
static Channel *unacked; static Channel *unacked;
static Channel *empty; static Channel *empty;
static int netfd; static int netfd;
static int inmsg; static ulong inmsg;
static ulong outmsg;
static char *devdir; static char *devdir;
static int debug; static int debug;
static int done; static int done;
@ -58,9 +59,7 @@ static void fromnet(void*);
static void fromclient(void*); static void fromclient(void*);
static int reconnect(int); static int reconnect(int);
static void synchronize(void); static void synchronize(void);
static void showmsg(int, char *, Buf *);
static int writen(int, uchar *, int); static int writen(int, uchar *, int);
static void dmessage(int, char *, ...);
static void timerproc(void *); static void timerproc(void *);
static void static void
@ -84,13 +83,13 @@ catch(void *, char *s)
static void* static void*
emalloc(int n) emalloc(int n)
{ {
ulong pc; uintptr pc;
void *v; void *v;
pc = getcallerpc(&n); pc = getcallerpc(&n);
v = malloc(n); v = malloc(n);
if(v == nil) if(v == nil)
sysfatal("Cannot allocate memory; pc=%lux", pc); sysfatal("Cannot allocate memory; pc=%#p", pc);
setmalloctag(v, pc); setmalloctag(v, pc);
return v; return v;
} }
@ -180,7 +179,7 @@ Restart:
if (netfd < 0 || failed) { if (netfd < 0 || failed) {
// Wait for the netreader to die. // Wait for the netreader to die.
while (netfd >= 0) { while (netfd >= 0) {
dmessage(1, "main; waiting for netreader to die\n"); if(debug) fprint(2, "main; waiting for netreader to die\n");
threadint(reader); threadint(reader);
sleep(1000); sleep(1000);
} }
@ -201,7 +200,6 @@ Restart:
PBIT32(hdr.msg, -1); PBIT32(hdr.msg, -1);
if (writen(netfd, (uchar *)&hdr, Hdrsz) < 0) { if (writen(netfd, (uchar *)&hdr, Hdrsz) < 0) {
dmessage(2, "main; writen failed; %r\n");
failed = 1; failed = 1;
continue; continue;
} }
@ -222,19 +220,15 @@ Restart:
PBIT32(b->hdr.acked, inmsg); PBIT32(b->hdr.acked, inmsg);
if (writen(netfd, (uchar *)&b->hdr, Hdrsz) < 0) { if (writen(netfd, (uchar *)&b->hdr, Hdrsz) < 0)
dmessage(2, "main; writen failed; %r\n");
failed = 1; failed = 1;
else {
n = GBIT32(b->hdr.nb);
if (writen(netfd, b->buf, n) < 0)
failed = 1;
if (n == 0)
done = 1;
} }
n = GBIT32(b->hdr.nb);
if (writen(netfd, b->buf, n) < 0) {
dmessage(2, "main; writen failed; %r\n");
failed = 1;
}
if (n == 0)
done = 1;
break; break;
} }
} }
@ -246,7 +240,6 @@ Restart:
static void static void
fromclient(void*) fromclient(void*)
{ {
static int outmsg;
int n; int n;
Buf *b; Buf *b;
@ -255,16 +248,10 @@ fromclient(void*)
do { do {
b = recvp(empty); b = recvp(empty);
n = read(0, b->buf, Bufsize); n = read(0, b->buf, Bufsize);
if (n <= 0) { if (n < 0)
if (n < 0)
dmessage(2, "fromclient; Cannot read 9P message; %r\n");
else
dmessage(2, "fromclient; Client terminated\n");
n = 0; n = 0;
}
PBIT32(b->hdr.nb, n); PBIT32(b->hdr.nb, n);
PBIT32(b->hdr.msg, outmsg); PBIT32(b->hdr.msg, outmsg);
showmsg(1, "fromclient", b);
sendp(unsent, b); sendp(unsent, b);
outmsg++; outmsg++;
} while(n > 0); } while(n > 0);
@ -274,8 +261,8 @@ static void
fromnet(void*) fromnet(void*)
{ {
extern void _threadnote(void *, char *); extern void _threadnote(void *, char *);
static int lastacked; ulong m, acked, lastacked = 0;
int n, m, len, acked; int n, len;
Buf *b; Buf *b;
notify(_threadnote); notify(_threadnote);
@ -287,17 +274,19 @@ fromnet(void*)
while (netfd < 0) { while (netfd < 0) {
if(done) if(done)
return; return;
dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n", inmsg); if(debug) fprint(2, "fromnet; waiting for connection... (inmsg %lud)\n", inmsg);
sleep(1000); sleep(1000);
} }
// Read the header. // Read the header.
len = readn(netfd, (uchar *)&b->hdr, Hdrsz); len = readn(netfd, (uchar *)&b->hdr, Hdrsz);
if (len <= 0) { if (len <= 0) {
if (len < 0) if (debug) {
dmessage(1, "fromnet; (hdr) network failure; %r\n"); if (len < 0)
else fprint(2, "fromnet; (hdr) network failure; %r\n");
dmessage(1, "fromnet; (hdr) network closed\n"); else
fprint(2, "fromnet; (hdr) network closed\n");
}
close(netfd); close(netfd);
netfd = -1; netfd = -1;
continue; continue;
@ -306,44 +295,42 @@ fromnet(void*)
n = GBIT32(b->hdr.nb); n = GBIT32(b->hdr.nb);
m = GBIT32(b->hdr.msg); m = GBIT32(b->hdr.msg);
acked = GBIT32(b->hdr.acked); acked = GBIT32(b->hdr.acked);
dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d, acked %d, lastacked %d\n",
len, n, m, acked, lastacked);
if (n == 0) { if (n == 0) {
if (m < 0) if (m == (ulong)-1)
continue; continue;
dmessage(1, "fromnet; network closed\n"); if(debug) fprint(2, "fromnet; network closed\n");
break; break;
} else if (n < 0 || n > Bufsize) { } else if (n < 0 || n > Bufsize) {
dmessage(1, "fromnet; message too big %d > %d\n", n, Bufsize); if(debug) fprint(2, "fromnet; message too big %d > %d\n", n, Bufsize);
break; break;
} }
len = readn(netfd, b->buf, n); len = readn(netfd, b->buf, n);
if (len <= 0 || len != n) { if (len <= 0 || len != n) {
if (len == 0) if (len == 0)
dmessage(1, "fromnet; network closed\n"); if(debug) fprint(2, "fromnet; network closed\n");
else else
dmessage(1, "fromnet; network failure; %r\n"); if(debug) fprint(2, "fromnet; network failure; %r\n");
close(netfd); close(netfd);
netfd = -1; netfd = -1;
continue; continue;
} }
if (m < inmsg) { if (m != inmsg) {
dmessage(1, "fromnet; skipping message %d, currently at %d\n", m, inmsg); if(debug) fprint(2, "fromnet; skipping message %lud, currently at %lud\n", m, inmsg);
continue; continue;
} }
inmsg++; inmsg++;
// Process the acked list. // Process the acked list.
while(lastacked != acked) { while((long)(acked - lastacked) > 0) {
Buf *rb; Buf *rb;
rb = recvp(unacked); if((rb = recvp(unacked)) == nil)
break;
m = GBIT32(rb->hdr.msg); m = GBIT32(rb->hdr.msg);
if (m != lastacked) { if (m != lastacked) {
dmessage(1, "fromnet; rb %p, msg %d, lastacked %d\n", rb, m, lastacked); if(debug) fprint(2, "fromnet; rb %p, msg %lud, lastacked %lud\n", rb, m, lastacked);
sysfatal("fromnet; bug"); sysfatal("fromnet; bug");
} }
PBIT32(rb->hdr.msg, -1); PBIT32(rb->hdr.msg, -1);
@ -351,8 +338,6 @@ fromnet(void*)
lastacked++; lastacked++;
} }
showmsg(1, "fromnet", b);
if (writen(1, b->buf, len) < 0) if (writen(1, b->buf, len) < 0)
sysfatal("fromnet; cannot write to client; %r"); sysfatal("fromnet; cannot write to client; %r");
} }
@ -375,10 +360,10 @@ reconnect(int secs)
err[0] = '\0'; err[0] = '\0';
errstr(err, sizeof err); errstr(err, sizeof err);
if (strstr(err, "connection refused")) { if (strstr(err, "connection refused")) {
dmessage(1, "reconnect; server died...\n"); if(debug) fprint(2, "reconnect; server died...\n");
threadexitsall("server died..."); threadexitsall("server died...");
} }
dmessage(1, "reconnect: dialed %s; %s\n", dialstring, err); if(debug) fprint(2, "reconnect: dialed %s; %s\n", dialstring, err);
sleep(1000); sleep(1000);
} }
alarm(0); alarm(0);
@ -427,22 +412,6 @@ synchronize(void)
unacked = tmp; unacked = tmp;
} }
static void
showmsg(int level, char *s, Buf *b)
{
int n;
if (b == nil) {
dmessage(level, "%s; b == nil\n", s);
return;
}
n = GBIT32(b->hdr.nb);
dmessage(level, "%s; (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s, n,
b->buf[0], b->buf[1], b->buf[2],
b->buf[3], b->buf[4], b->buf[5],
b->buf[6], b->buf[7], b->buf[8], b);
}
static int static int
writen(int fd, uchar *buf, int nb) writen(int fd, uchar *buf, int nb)
{ {
@ -455,10 +424,9 @@ writen(int fd, uchar *buf, int nb)
return -1; return -1;
if ((n = write(fd, buf, nb)) < 0) { if ((n = write(fd, buf, nb)) < 0) {
dmessage(1, "writen; Write failed; %r\n"); if(debug) fprint(2, "writen; Write failed; %r\n");
return -1; return -1;
} }
dmessage(2, "writen: wrote %d bytes\n", n);
buf += n; buf += n;
nb -= n; nb -= n;
@ -478,16 +446,3 @@ timerproc(void *x)
sendp(timer, "timer"); sendp(timer, "timer");
} }
} }
static void
dmessage(int level, char *fmt, ...)
{
va_list arg;
if (level > debug)
return;
va_start(arg, fmt);
vfprint(2, fmt, arg);
va_end(arg);
}