torrent: listen support

This commit is contained in:
cinap_lenrek 2011-10-24 00:53:27 +02:00
parent 98d01a7719
commit f5fe39ad7a

View file

@ -6,6 +6,7 @@
typedef struct Dict Dict;
typedef struct Piece Piece;
typedef struct File File;
typedef struct Stats Stats;
struct Dict
{
@ -33,13 +34,21 @@ struct File
vlong len;
};
struct Stats
{
Lock;
vlong up;
vlong down;
vlong left;
};
enum {
MAXIO = 16*1024,
};
int debug, sflag, pflag, vflag;
int pidgroup = -1;
int port = 48123;
int killgroup = -1;
int port = 6881;
char *mntweb = "/mnt/web";
uchar infohash[20];
uchar peerid[20];
@ -50,8 +59,10 @@ Piece *pieces;
int nhavemap;
uchar *havemap;
int nhavepieces;
File *files;
Stats stats;
void
freedict(Dict *d)
@ -208,7 +219,13 @@ havepiece(int x)
free(p);
if(memcmp(hash, pieces[x].hash, 20))
return 0;
havemap[x>>3] |= m;
lock(&stats);
if((havemap[x>>3] & m) == 0){
havemap[x>>3] |= m;
nhavepieces++;
stats.left -= pieces[x].len;
}
unlock(&stats);
return 1;
}
@ -309,86 +326,60 @@ Err:
return -1;
}
void
peer(char *ip, char *port)
{
static Dict *peers;
static QLock peerslk;
int
peer(int fd, int incoming, char *addr)
{
uchar buf[64+MAXIO], *map, *told, *p, m;
char *addr;
int retry, i, o, l, x, n, fd;
int mechoking, hechoking;
int mewant, hewant;
int workpiece;
Dict *d;
int i, o, l, x, n;
if(ip == nil || port == nil)
return;
if(debug) fprint(2, "peer %s: %s connected\n", addr, incoming ? "incoming" : "outgoing");
d = mallocz(sizeof(*d) + 64, 1);
snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
qlock(&peerslk);
if(dlook(peers, addr)){
qunlock(&peerslk);
free(d);
return;
for(i=0; i<2; i++){
if((incoming && i) || (!incoming && !i)){
if(debug) fprint(2, "peer %s: -> handshake\n", addr);
n = pack(buf, sizeof(buf), "*________**",
20, "\x13BitTorrent protocol",
sizeof(infohash), infohash,
sizeof(peerid), peerid);
if(write(fd, buf, n) != n)
return 1;
}
if((incoming && !i) || (!incoming && i)){
n = 20 + 8 + sizeof(infohash);
if((n = readn(fd, buf, n)) != n)
return 1;
if(memcmp(buf, "\x13BitTorrent protocol", 20))
return 0;
if(debug) fprint(2, "peer %s: <- handshake\n", addr);
if(memcmp(infohash, buf + 20 + 8, sizeof(infohash)))
return 0;
}
}
d->len = strlen(addr);
d->typ = 'd';
d->val = d;
d->next = peers;
peers = d;
qunlock(&peerslk);
if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
return;
fd = -1;
retry = 0;
map = malloc(nhavemap);
told = malloc(nhavemap);
Retry:
if(fd >= 0){
close(fd);
sleep(10000 + nrand(5000));
}
if(++retry >= 10)
goto Exit;
if(debug) fprint(2, "dial %s\n", addr);
if((fd = dial(addr, nil, nil, nil)) < 0)
goto Retry;
if(debug) fprint(2, "peer %s: -> handshake\n", addr);
n = pack(buf, sizeof(buf), "*________**",
20, "\x13BitTorrent protocol",
sizeof(infohash), infohash,
sizeof(peerid), peerid);
if(write(fd, buf, n) != n)
goto Retry;
if(read(fd, buf, 1) != 1)
goto Retry;
n = buf[0] + 8 + sizeof(infohash) + sizeof(peerid);
if((n = readn(fd, buf+1, n)) != n)
goto Retry;
if(debug) fprint(2, "peer %s: <- handshake %.*s\n", addr, buf[0], (char*)buf+1);
if(memcmp(infohash, buf + 1 + buf[0] + 8, sizeof(infohash)))
goto Exit;
if(debug) fprint(2, "peer %s: -> bitfield %d\n", addr, nhavemap);
memmove(told, havemap, nhavemap);
n = pack(buf, sizeof(buf), "lb*", nhavemap+1, 0x05, nhavemap, havemap);
if(write(fd, buf, n) != n)
goto Retry;
if(readn(fd, buf, sizeof(peerid)) != sizeof(peerid))
return 1;
if(memcmp(peerid, buf, sizeof(peerid)) == 0)
return 0;
if(debug) fprint(2, "peer %s: peerid %.*s\n", addr, sizeof(peerid), (char*)buf);
mechoking = 1;
hechoking = 1;
mewant = 0;
hewant = 0;
workpiece = -1;
memset(map, 0, nhavemap);
map = mallocz(nhavemap, 1);
told = malloc(nhavemap);
if(debug) fprint(2, "peer %s: -> bitfield %d\n", addr, nhavemap);
memmove(told, havemap, nhavemap);
n = pack(buf, sizeof(buf), "lb*", nhavemap+1, 0x05, nhavemap, havemap);
if(write(fd, buf, n) != n)
goto Out;
for(;;){
for(i=0; i<nhavemap; i++){
if(told[i] != havemap[i]){
@ -399,7 +390,7 @@ Retry:
if(debug) fprint(2, "peer %s: -> have %d\n", addr, x);
n = pack(buf, sizeof(buf), "lbl", 1+4, 0x04, x);
if(write(fd, buf, n) != n)
goto Retry;
goto Out;
}
}
if(!mewant && (map[i] & ~havemap[i])){
@ -407,7 +398,7 @@ Retry:
if(debug) fprint(2, "peer %s: -> interested\n", addr);
n = pack(buf, sizeof(buf), "lb", 1, 0x02);
if(write(fd, buf, n) != n)
goto Retry;
goto Out;
}
}
if(!hechoking && mewant){
@ -423,7 +414,7 @@ Retry:
if(debug) fprint(2, "peer %s: -> request %d %d %d\n", addr, x, o, l);
n = pack(buf, sizeof(buf), "lblll", 1+4+4+4, 0x06, x, o, l);
if(write(fd, buf, n) != n)
goto Retry;
goto Out;
workpiece = x;
}
}
@ -432,21 +423,21 @@ Retry:
if(debug) fprint(2, "peer %s: -> unchoke\n", addr);
n = pack(buf, sizeof(buf), "lb", 1, 0x01);
if(write(fd, buf, n) != n)
goto Retry;
goto Out;
}
if(readn(fd, buf, 4) != 4)
goto Retry;
break;
unpack(buf, 4, "l", &n);
if(n < 0 || n > sizeof(buf))
break;
if(n == 0)
continue;
if(n < 0 || n > sizeof(buf))
goto Retry;
if(readn(fd, buf, n) != n)
goto Retry;
retry = 0;
p = buf+1;
break;
n--;
p = buf+1;
switch(*buf){
case 0x00: // Choke
hechoking = 1;
@ -467,7 +458,7 @@ Retry:
break;
case 0x04: // Have <piceindex>
if(unpack(p, n, "l", &x) < 0)
goto Retry;
goto Out;
if(debug) fprint(2, "peer %s: <- have %d\n", addr, x);
if(x < 0 || x >= npieces)
continue;
@ -481,7 +472,7 @@ Retry:
break;
case 0x06: // Request <index> <begin> <length>
if(unpack(p, n, "lll", &x, &o, &l) < 0)
goto Retry;
goto Out;
if(debug) fprint(2, "peer %s: <- request %d %d %d\n", addr, x, o, l);
if(x < 0 || x >= npieces)
continue;
@ -496,13 +487,19 @@ Retry:
n = pack(buf, sizeof(buf), "lbll", 1+4+4+l, 0x07, x, o);
n += l;
if(write(fd, buf, n) != n)
goto Retry;
goto Out;
lock(&stats);
stats.up += n;
unlock(&stats);
break;
case 0x07: // Piece <index> <begin> <block>
if(unpack(p, n, "ll", &x, &o) != 8)
goto Retry;
goto Out;
p += 8;
n -= 8;
lock(&stats);
stats.down += n;
unlock(&stats);
if(debug) fprint(2, "peer %s: <- piece %d %d %d\n", addr, x, o, n);
if(x < 0 || x >= npieces)
continue;
@ -517,19 +514,102 @@ Retry:
break;
case 0x08: // Cancel <index> <begin> <length>
if(unpack(p, n, "lll", &x, &o, &l) < 0)
goto Retry;
goto Out;
if(debug) fprint(2, "peer %s: <- cancel %d %d %d\n", addr, x, o, l);
break;
case 0x09: // Port <port>
if(unpack(p, n, "l", &x) < 0)
goto Retry;
goto Out;
if(debug) fprint(2, "peer %s: <- port %d\n", addr, x);
break;
}
}
Exit:
Out:
free(told);
free(map);
return 1;
}
void
server(void)
{
char addr[64], adir[40], ldir[40];
int afd, lfd, dfd;
NetConnInfo *ni;
afd = -1;
for(port=6881; port<6890; port++){
snprint(addr, sizeof(addr), "tcp!*!%d", port);
if((afd = announce(addr, adir)) >= 0)
break;
}
if(afd < 0){
fprint(2, "announce: %r");
return;
}
if(rfork(RFFDG|RFPROC|RFMEM))
return;
for(;;){
if((lfd = listen(adir, ldir)) < 0){
fprint(2, "listen: %r");
break;
}
if(rfork(RFFDG|RFPROC|RFMEM)){
close(lfd);
continue;
}
if((dfd = accept(lfd, ldir)) < 0){
fprint(2, "accept: %r");
break;
}
ni = getnetconninfo(ldir, dfd);
peer(dfd, 1, ni ? ni->raddr : "???");
if(ni) freenetconninfo(ni);
break;
}
exits(0);
}
void
client(char *ip, char *port)
{
static Dict *peers;
static QLock peerslk;
int try, fd;
char *addr;
Dict *d;
if(ip == nil || port == nil)
return;
d = mallocz(sizeof(*d) + 64, 1);
snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
qlock(&peerslk);
if(dlook(peers, addr)){
qunlock(&peerslk);
free(d);
return;
}
d->len = strlen(addr);
d->typ = 'd';
d->val = d;
d->next = peers;
peers = d;
qunlock(&peerslk);
if(debug) fprint(2, "client %s\n", addr);
if(rfork(RFFDG|RFPROC|RFMEM))
return;
for(try = 0; try < 10; try++){
if((fd = dial(addr, nil, nil, nil)) >= 0){
if(!peer(fd, 0, addr))
break;
close(fd);
}
sleep((1000<<try)+nrand(5000));
}
exits(0);
}
@ -571,9 +651,9 @@ tracker(char *url)
static Dict *trackers;
static QLock trackerslk;
Dict *d, *l;
int n, fd;
char *p;
Dict *d, *l;
if(url == nil)
return;
@ -594,17 +674,32 @@ tracker(char *url)
url = d->str;
qunlock(&trackerslk);
if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
if(debug) fprint(2, "tracker %s\n", url);
if(rfork(RFPROC|RFMEM))
return;
for(;;){
vlong up, down, left;
lock(&stats);
up = stats.up;
down = stats.down;
left = stats.left;
unlock(&stats);
d = nil;
if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&compact=1",
url, sizeof(infohash), infohash, sizeof(peerid), peerid, port)) >= 0){
if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&"
"uploaded=%lld&downloaded=%lld&left=%lld&"
"compact=1",
url, sizeof(infohash), infohash, sizeof(peerid), peerid, port,
up, down, left)) >= 0){
n = readall(fd, &p);
close(fd);
bparse(p, p+n, &d);
free(p);
} else {
if(debug) fprint(2, "tracker %s: %r\n", url);
}
if(l = dlook(d, "peers")){
if(l->typ == 's'){
@ -617,10 +712,10 @@ tracker(char *url)
snprint(ip, sizeof(ip), "%d.%d.%d.%d", b[0], b[1], b[2], b[3]);
snprint(port, sizeof(port), "%d", b[4]<<8 | b[5]);
peer(ip, port);
client(ip, port);
}
} else for(; l && l->typ == 'l'; l = l->next)
peer(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
client(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
}
n = 0;
if(p = dstr(dlook(d, "interval")))
@ -651,34 +746,9 @@ Hfmt(Fmt *f)
}
int
progress(void)
killnote(void *, char *)
{
int i, c;
uchar m;
c = 0;
for(i=0; i<nhavemap; i++)
for(m = 0x80; m; m>>=1)
if(havemap[i] & m)
c++;
if(pflag)
print("%d %d\n", c, npieces);
return c == npieces;
}
void
killcohort(void)
{
int i;
for(i=0;i!=3;i++){ /* It's a long way to the kitchen */
postnote(PNGROUP, pidgroup, "kill");
sleep(1);
}
}
int
catchnote(void *, char *msg)
{
exits(msg);
postnote(PNGROUP, killgroup, "kill");
return 0;
}
@ -788,6 +858,7 @@ main(int argc, char *argv[])
else
pieces[i].len = blocksize;
len -= pieces[i].len;
stats.left += pieces[i].len;
}
if(len)
sysfatal("pieces do not match file length");
@ -795,23 +866,31 @@ main(int argc, char *argv[])
for(i = 0; i<npieces; i++)
havepiece(i);
switch(i = rfork(RFPROC|RFMEM|RFNOTEG|RFNAMEG)){
srand(time(0));
atnotify(killnote, 1);
switch(i = rfork(RFPROC|RFMEM|RFNOTEG)){
case -1:
sysfatal("fork: %r");
case 0:
memmove(peerid, "-NF9001-", 8);
genrandom(peerid+8, sizeof(peerid)-8);
for(i=8; i<sizeof(peerid); i++)
peerid[i] = nrand(10)+'0';
server();
tracker(dstr(dlook(torrent, "announce")));
for(d = dlook(torrent, "announce-list"); d && d->typ == 'l'; d = d->next)
if(d->val && d->val->typ == 'l')
tracker(dstr(d->val->val));
while(waitpid() != -1)
;
break;
default:
pidgroup = i;
atexit(killcohort);
atnotify(catchnote, 1);
while(!progress() || sflag)
killgroup = i;
while((nhavepieces < npieces) || sflag){
if(pflag)
print("%d %d\n", nhavepieces, npieces);
sleep(1000);
}
}
postnote(PNGROUP, killgroup, "kill");
exits(0);
}