From f297042b0c51a22338579851a99418d4f4c49c15 Mon Sep 17 00:00:00 2001 From: William Pitcock Date: Sat, 2 Apr 2016 18:14:56 -0500 Subject: [PATCH] wsockd: more support infrastructure for websockets --- wsockd/wsockd.c | 89 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/wsockd/wsockd.c b/wsockd/wsockd.c index db1d3ddc..4ead19de 100644 --- a/wsockd/wsockd.c +++ b/wsockd/wsockd.c @@ -276,6 +276,13 @@ conn_mod_write(conn_t * conn, void *data, size_t len) rb_rawbuf_append(conn->modbuf_out, data, len); } +static void +conn_mod_write_frame(conn_t * conn, void *data, size_t len) +{ + if(IsDead(conn)) /* no point in queueing to a dead man */ + return; +} + static void conn_plain_write(conn_t * conn, void *data, size_t len) { @@ -458,7 +465,7 @@ conn_mod_handshake_process(conn_t *conn) } static void -conn_mod_handshake_cb(rb_fde_t *fd, void *data) +conn_mod_read_cb(rb_fde_t *fd, void *data) { char inbuf[READBUF_SIZE]; conn_t *conn = data; @@ -479,7 +486,7 @@ conn_mod_handshake_cb(rb_fde_t *fd, void *data) if (length < 0) { if (rb_ignore_errno(errno)) - rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn); + rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn); else close_conn(conn, NO_WAIT, "Connection closed"); @@ -492,24 +499,92 @@ conn_mod_handshake_cb(rb_fde_t *fd, void *data) } rb_rawbuf_append(conn->modbuf_in, inbuf, length); - conn_mod_handshake_process(conn); + if (!IsKeyed(conn)) + conn_mod_handshake_process(conn); if (length < sizeof(inbuf)) { - rb_setselect(fd, RB_SELECT_READ, conn_mod_handshake_cb, conn); + rb_setselect(fd, RB_SELECT_READ, conn_mod_read_cb, conn); return; } } } -static void -conn_mod_read_cb(rb_fde_t *fd, void *data) +static bool +plain_check_cork(conn_t * conn) { + if(rb_rawbuf_length(conn->modbuf_out) >= 4096) + { + /* if we have over 4k pending outbound, don't read until + * we've cleared the queue */ + SetCork(conn); + rb_setselect(conn->plain_fd, RB_SELECT_READ, NULL, NULL); + /* try to write */ + conn_mod_write_sendq(conn->mod_fd, conn); + return true; + } + + return false; +} + +static void +conn_plain_process_recvq(conn_t *conn) +{ + char inbuf[READBUF_SIZE]; + + while (1) + { + size_t dolen = rb_linebuf_get(&conn->plainbuf_in, inbuf, sizeof inbuf, LINEBUF_COMPLETE, LINEBUF_PARSED); + if (!dolen) + break; + + conn_mod_write_frame(conn, inbuf, dolen); + } } static void conn_plain_read_cb(rb_fde_t *fd, void *data) { + char inbuf[READBUF_SIZE]; + conn_t *conn = data; + int length = 0; + if(conn == NULL) + return; + + if(IsDead(conn)) + return; + + if(plain_check_cork(conn)) + return; + + while(1) + { + if(IsDead(conn)) + return; + + length = rb_read(conn->plain_fd, inbuf, sizeof(inbuf)); + + if(length == 0 || (length < 0 && !rb_ignore_errno(errno))) + { + close_conn(conn, NO_WAIT, NULL); + return; + } + + if(length < 0) + { + rb_setselect(conn->plain_fd, RB_SELECT_READ, conn_plain_read_cb, conn); + conn_plain_process_recvq(conn); + return; + } + conn->plain_in += length; + + (void) rb_linebuf_parse(&conn->plainbuf_in, inbuf, sizeof(inbuf), 0); + + if(IsDead(conn)) + return; + if(plain_check_cork(conn)) + return; + } } static void @@ -559,7 +634,7 @@ wsock_process(mod_ctl_t * ctl, mod_ctl_buf_t * ctlb) if(rb_get_type(conn->plain_fd) == RB_FD_UNKNOWN) rb_set_type(conn->plain_fd, RB_FD_SOCKET); - conn_mod_handshake_cb(conn->mod_fd, conn); + conn_mod_read_cb(conn->mod_fd, conn); } static void