aan: use sync messages as keep alives
both server and client need to be convinced that the connection is broken for a connection reestablishment to happen as the server will only start looking for incoming clients when the connection already broke. so use the 8 second interval sync messages to check for connection lifeness. if we miss two syncs in time, we declare the connecton to be broken and will try to reconnect.
This commit is contained in:
parent
481ae71940
commit
bf6ba56817
1 changed files with 31 additions and 25 deletions
|
@ -21,21 +21,13 @@ enum {
|
|||
Hdrsz = 3*4,
|
||||
};
|
||||
|
||||
typedef struct Endpoints Endpoints;
|
||||
struct Endpoints {
|
||||
char *lsys;
|
||||
char *lserv;
|
||||
char *rsys;
|
||||
char *rserv;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
uchar nb[4]; // Number of data bytes in this message
|
||||
uchar msg[4]; // Message number
|
||||
uchar acked[4]; // Number of messages acked
|
||||
} Hdr;
|
||||
|
||||
typedef struct t_Buf {
|
||||
typedef struct {
|
||||
Hdr hdr;
|
||||
uchar buf[Bufsize];
|
||||
} Buf;
|
||||
|
@ -52,6 +44,8 @@ static char *dialstring;
|
|||
static int maxto = Maxto;
|
||||
static char *Logname = "aan";
|
||||
static int client;
|
||||
static int reader = -1;
|
||||
static int lostsync;
|
||||
|
||||
static Alt a[] = {
|
||||
/* c v op */
|
||||
|
@ -62,7 +56,7 @@ static Alt a[] = {
|
|||
|
||||
static void fromnet(void*);
|
||||
static void fromclient(void*);
|
||||
static void reconnect(int);
|
||||
static int reconnect(int);
|
||||
static void synchronize(void);
|
||||
static int sendcommand(ulong, ulong);
|
||||
static void showmsg(int, char *, Buf *);
|
||||
|
@ -77,6 +71,7 @@ usage(void)
|
|||
exits("usage");
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
catch(void *, char *s)
|
||||
{
|
||||
|
@ -107,6 +102,7 @@ threadmain(int argc, char **argv)
|
|||
vlong synctime;
|
||||
int i, n, failed;
|
||||
Channel *timer;
|
||||
Hdr hdr;
|
||||
Buf *b;
|
||||
|
||||
ARGBEGIN {
|
||||
|
@ -151,8 +147,7 @@ threadmain(int argc, char **argv)
|
|||
* long if there is some general connection problem
|
||||
* (like NAT).
|
||||
*/
|
||||
netfd = -1;
|
||||
reconnect(60);
|
||||
netfd = reconnect(60);
|
||||
|
||||
unsent = chancreate(sizeof(Buf *), Nbuf);
|
||||
unacked = chancreate(sizeof(Buf *), Nbuf);
|
||||
|
@ -164,7 +159,8 @@ threadmain(int argc, char **argv)
|
|||
for (i = 0; i < Nbuf; i++)
|
||||
sendp(empty, emalloc(sizeof(Buf)));
|
||||
|
||||
if (proccreate(fromnet, nil, Stacksize) < 0)
|
||||
reader = proccreate(fromnet, nil, Stacksize);
|
||||
if (reader < 0)
|
||||
sysfatal("Cannot start fromnet; %r");
|
||||
|
||||
if (proccreate(fromclient, nil, Stacksize) < 0)
|
||||
|
@ -177,24 +173,29 @@ threadmain(int argc, char **argv)
|
|||
a[Unsent].c = unsent;
|
||||
a[Unsent].v = &b;
|
||||
|
||||
Restart:
|
||||
synctime = nsec() + Synctime;
|
||||
failed = 0;
|
||||
lostsync = 0;
|
||||
while (!done) {
|
||||
if (failed) {
|
||||
// Wait for the netreader to die.
|
||||
while (netfd >= 0) {
|
||||
dmessage(1, "main; waiting for netreader to die\n");
|
||||
threadint(reader);
|
||||
sleep(1000);
|
||||
}
|
||||
|
||||
// the reader died; reestablish the world.
|
||||
reconnect(maxto);
|
||||
netfd = reconnect(maxto);
|
||||
synchronize();
|
||||
failed = 0;
|
||||
goto Restart;
|
||||
}
|
||||
|
||||
if (nsec() >= synctime) {
|
||||
Hdr hdr;
|
||||
switch (alt(a)) {
|
||||
case Timer:
|
||||
if (netfd < 0 || nsec() < synctime)
|
||||
break;
|
||||
|
||||
PBIT32(hdr.nb, 0);
|
||||
PBIT32(hdr.acked, inmsg);
|
||||
|
@ -205,11 +206,13 @@ threadmain(int argc, char **argv)
|
|||
failed = 1;
|
||||
continue;
|
||||
}
|
||||
synctime = nsec() + Synctime;
|
||||
}
|
||||
|
||||
switch (alt(a)) {
|
||||
case Timer:
|
||||
if(++lostsync > 2){
|
||||
dmessage(2, "main; lost sync\n");
|
||||
failed = 1;
|
||||
continue;
|
||||
}
|
||||
synctime = nsec() + Synctime;
|
||||
break;
|
||||
|
||||
case Unsent:
|
||||
|
@ -268,10 +271,13 @@ fromclient(void*)
|
|||
static void
|
||||
fromnet(void*)
|
||||
{
|
||||
extern void _threadnote(void *, char *);
|
||||
static int lastacked;
|
||||
int n, m, len, acked;
|
||||
Buf *b;
|
||||
|
||||
notify(_threadnote);
|
||||
|
||||
threadsetname("fromnet");
|
||||
|
||||
b = emalloc(sizeof(Buf));
|
||||
|
@ -294,6 +300,7 @@ fromnet(void*)
|
|||
netfd = -1;
|
||||
continue;
|
||||
}
|
||||
lostsync = 0; // reset timeout
|
||||
n = GBIT32(b->hdr.nb);
|
||||
m = GBIT32(b->hdr.msg);
|
||||
acked = GBIT32(b->hdr.acked);
|
||||
|
@ -353,7 +360,7 @@ fromnet(void*)
|
|||
done = 1;
|
||||
}
|
||||
|
||||
static void
|
||||
static int
|
||||
reconnect(int secs)
|
||||
{
|
||||
NetConnInfo *nci;
|
||||
|
@ -395,9 +402,8 @@ reconnect(int secs)
|
|||
freenetconninfo(nci);
|
||||
} else
|
||||
syslog(0, Logname, "connected");
|
||||
|
||||
// Wakes up the netreader.
|
||||
netfd = fd;
|
||||
|
||||
return fd;
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
Loading…
Reference in a new issue