From a0315ab1dca6db90ce00127a528176b35f979884 Mon Sep 17 00:00:00 2001 From: Joel Martin Date: Fri, 10 Sep 2010 13:05:48 -0500 Subject: [PATCH] wsproxy: multiprocess capable. Add -m, --multiprocess option which forks a handler for each connection allowing multiple connections to the same target using the same proxy instance. Cleaned up the output of the handler process. Each process' output is prefixed with an ordinal value. Changed both the C and python versions of the proxy. --- utils/websocket.c | 77 +++++++++++++++++++++++++++++++--------------- utils/websocket.h | 15 +++++++++ utils/websocket.py | 74 ++++++++++++++++++++++++++++++++------------ utils/wsproxy.c | 58 ++++++++++++++++++++++------------ utils/wsproxy.py | 17 ++++++---- 5 files changed, 172 insertions(+), 69 deletions(-) diff --git a/utils/websocket.c b/utils/websocket.c index 54ce987..1bcc21b 100644 --- a/utils/websocket.c +++ b/utils/websocket.c @@ -39,13 +39,16 @@ const char policy_response[] = "ssl) { - //printf("SSL recv\n"); + //handler_msg("SSL recv\n"); return SSL_read(ctx->ssl, buf, len); } else { return recv(ctx->sockfd, buf, len, 0); @@ -98,7 +101,7 @@ ssize_t ws_recv(ws_ctx_t *ctx, void *buf, size_t len) { ssize_t ws_send(ws_ctx_t *ctx, const void *buf, size_t len) { if (ctx->ssl) { - //printf("SSL send\n"); + //handler_msg("SSL send\n"); return SSL_write(ctx->ssl, buf, len); } else { return send(ctx->sockfd, buf, len, 0); @@ -202,7 +205,7 @@ int decode(char *src, size_t srclength, u_char *target, size_t targsize) { int i, len, framecount = 0, retlen = 0; unsigned char chr; if ((src[0] != '\x00') || (src[srclength-1] != '\xff')) { - fprintf(stderr, "WebSocket framing error\n"); + handler_emsg("WebSocket framing error\n"); return -1; } start = src+1; // Skip '\x00' start @@ -337,13 +340,13 @@ ws_ctx_t *do_handshake(int sock) { len = recv(sock, handshake, 1024, MSG_PEEK); handshake[len] = 0; if (len == 0) { - printf("Ignoring empty handshake\n"); + handler_msg("ignoring empty handshake\n"); close(sock); return NULL; } else if (bcmp(handshake, "", 22) == 0) { len = recv(sock, handshake, 1024, 0); handshake[len] = 0; - printf("Sending flash policy response\n"); + handler_msg("sending flash policy response\n"); send(sock, policy_response, sizeof(policy_response), 0); close(sock); return NULL; @@ -353,22 +356,22 @@ ws_ctx_t *do_handshake(int sock) { ws_ctx = ws_socket_ssl(sock, settings.cert); if (! ws_ctx) { return NULL; } scheme = "wss"; - printf(" using SSL socket\n"); + handler_msg("using SSL socket\n"); } else if (settings.ssl_only) { - printf("Non-SSL connection disallowed\n"); + handler_msg("non-SSL connection disallowed\n"); close(sock); return NULL; } else { ws_ctx = ws_socket(sock); if (! ws_ctx) { return NULL; } scheme = "ws"; - printf(" using plain (not SSL) socket\n"); + handler_msg("using plain (not SSL) socket\n"); } len = ws_recv(ws_ctx, handshake, 4096); handshake[len] = 0; if (!parse_handshake(handshake, &headers)) { - fprintf(stderr, "Invalid WS request\n"); + handler_emsg("Invalid WS request\n"); close(sock); return NULL; } @@ -376,16 +379,16 @@ ws_ctx_t *do_handshake(int sock) { if (headers.key3[0] != '\0') { gen_md5(&headers, trailer); pre = "Sec-"; - printf(" using protocol version 76\n"); + handler_msg("using protocol version 76\n"); } else { trailer[0] = '\0'; pre = ""; - printf(" using protocol version 75\n"); + handler_msg("using protocol version 75\n"); } sprintf(response, server_handshake, pre, headers.origin, pre, scheme, headers.host, headers.path, pre, trailer); - //printf("response: %s\n", response); + //handler_msg("response: %s\n", response); ws_send(ws_ctx, response, strlen(response)); return ws_ctx; @@ -393,7 +396,8 @@ ws_ctx_t *do_handshake(int sock) { void signal_handler(sig) { switch (sig) { - case SIGHUP: break; // ignore + case SIGHUP: break; // ignore for now + case SIGPIPE: pipe_error = 1; break; // handle inline case SIGTERM: exit(0); break; } } @@ -434,7 +438,7 @@ void daemonize(int keepfd) { void start_server() { - int lsock, csock, clilen, sopt = 1, i; + int lsock, csock, pid, clilen, sopt = 1, i; struct sockaddr_in serv_addr, cli_addr; ws_ctx_t *ws_ctx; @@ -470,18 +474,26 @@ void start_server() { } listen(lsock,100); + signal(SIGPIPE, signal_handler); // catch pipe + if (settings.daemon) { daemonize(lsock); } + if (settings.multiprocess) { + printf("Waiting for connections on %s:%d\n", + settings.listen_host, settings.listen_port); + // Reep zombies + signal(SIGCHLD, SIG_IGN); + } + while (1) { clilen = sizeof(cli_addr); - if (settings.listen_host && settings.listen_host[0] != '\0') { - printf("waiting for connection on %s:%d\n", + pipe_error = 0; + pid = 0; + if (! settings.multiprocess) { + printf("Waiting for connection on %s:%d\n", settings.listen_host, settings.listen_port); - } else { - printf("waiting for connection on port %d\n", - settings.listen_port); } csock = accept(lsock, (struct sockaddr *) &cli_addr, @@ -490,7 +502,8 @@ void start_server() { error("ERROR on accept"); continue; } - printf("Got client connection from %s\n", inet_ntoa(cli_addr.sin_addr)); + handler_msg("got client connection from %s\n", + inet_ntoa(cli_addr.sin_addr)); ws_ctx = do_handshake(csock); if (ws_ctx == NULL) { close(csock); @@ -501,8 +514,24 @@ void start_server() { * 20 for WS '\x00' / '\xff' and good measure */ dbufsize = (bufsize * 3)/4 - 20; - settings.handler(ws_ctx); - close(csock); + if (settings.multiprocess) { + handler_msg("forking handler process\n"); + pid = fork(); + } + + if (pid == 0) { // handler process + settings.handler(ws_ctx); + if (pipe_error) { + handler_emsg("Closing due to SIGPIPE\n"); + } + close(csock); + if (settings.multiprocess) { + handler_msg("handler exit\n"); + break; // Child process exits + } + } else { // parent process + settings.handler_id += 1; + } } } diff --git a/utils/websocket.h b/utils/websocket.h index 6271cc9..3246d73 100644 --- a/utils/websocket.h +++ b/utils/websocket.h @@ -10,8 +10,10 @@ typedef struct { char listen_host[256]; int listen_port; void (*handler)(ws_ctx_t*); + int handler_id; int ssl_only; int daemon; + int multiprocess; char *record; char *cert; } settings_t; @@ -34,3 +36,16 @@ ssize_t ws_send(ws_ctx_t *ctx, const void *buf, size_t len); //int b64_ntop(u_char const *src, size_t srclength, char *target, size_t targsize); //int b64_pton(char const *src, u_char *target, size_t targsize); +#define gen_handler_msg(stream, ...) \ + if (! settings.daemon) { \ + if (settings.multiprocess) { \ + fprintf(stream, " %d: ", settings.handler_id); \ + } else { \ + fprintf(stream, " "); \ + } \ + fprintf(stream, __VA_ARGS__); \ + } + +#define handler_msg(...) gen_handler_msg(stdout, __VA_ARGS__); +#define handler_emsg(...) gen_handler_msg(stderr, __VA_ARGS__); + diff --git a/utils/websocket.py b/utils/websocket.py index 7578946..abc463f 100755 --- a/utils/websocket.py +++ b/utils/websocket.py @@ -25,9 +25,11 @@ settings = { 'listen_host' : '', 'listen_port' : None, 'handler' : None, + 'handler_id' : 1, 'cert' : None, 'ssl_only' : False, 'daemon' : True, + 'multiprocess': False, 'record' : None, } server_handshake = """HTTP/1.1 101 Web Socket Protocol Handshake\r @@ -41,9 +43,20 @@ Connection: Upgrade\r policy_response = """\n""" +class EClose(Exception): + pass + def traffic(token="."): - sys.stdout.write(token) - sys.stdout.flush() + if not settings['daemon'] and not settings['multiprocess']: + sys.stdout.write(token) + sys.stdout.flush() + +def handler_msg(msg): + if not settings['daemon']: + if settings['multiprocess']: + print " %d: %s" % (settings['handler_id'], msg) + else: + print " %s" % msg def encode(buf): buf = b64encode(buf) @@ -89,14 +102,14 @@ def do_handshake(sock): # Peek, but don't read the data handshake = sock.recv(1024, socket.MSG_PEEK) - #print "Handshake [%s]" % repr(handshake) + #handler_msg("Handshake [%s]" % repr(handshake)) if handshake == "": - print "Ignoring empty handshake" + handler_msg("ignoring empty handshake") sock.close() return False elif handshake.startswith(""): handshake = sock.recv(1024) - print "Sending flash policy response" + handler_msg("Sending flash policy response") sock.send(policy_response) sock.close() return False @@ -107,32 +120,32 @@ def do_handshake(sock): certfile=settings['cert'], ssl_version=ssl.PROTOCOL_TLSv1) scheme = "wss" - print " using SSL/TLS" + handler_msg("using SSL/TLS") elif settings['ssl_only']: - print "Non-SSL connection disallowed" + handler_msg("non-SSL connection disallowed") sock.close() return False else: retsock = sock scheme = "ws" - print " using plain (not SSL) socket" + handler_msg("using plain (not SSL) socket") handshake = retsock.recv(4096) - #print "handshake: " + repr(handshake) + #handler_msg("handshake: " + repr(handshake)) h = parse_handshake(handshake) if h.get('key3'): trailer = gen_md5(h) pre = "Sec-" - print " using protocol version 76" + handler_msg("using protocol version 76") else: trailer = "" pre = "" - print " using protocol version 75" + handler_msg("using protocol version 75") response = server_handshake % (pre, h['Origin'], pre, scheme, h['Host'], h['path'], pre, trailer) - #print "sending response:", repr(response) + #handler_msg("sending response:", repr(response)) retsock.send(response) return retsock @@ -177,21 +190,44 @@ def start_server(): lsock.bind((settings['listen_host'], settings['listen_port'])) lsock.listen(100) - if settings['daemon']: daemonize(keepfd=lsock.fileno()) + if settings['daemon']: + daemonize(keepfd=lsock.fileno()) + + if settings['multiprocess']: + print 'Waiting for connections on %s:%s' % ( + settings['listen_host'], settings['listen_port']) + # Reep zombies + signal.signal(signal.SIGCHLD, signal.SIG_IGN) while True: try: csock = startsock = None - print 'waiting for connection on port %s' % settings['listen_port'] + pid = 0 + if not settings['multiprocess']: + print 'Waiting for connection on %s:%s' % ( + settings['listen_host'], settings['listen_port']) startsock, address = lsock.accept() - print 'Got client connection from %s' % address[0] + handler_msg('got client connection from %s' % address[0]) csock = do_handshake(startsock) if not csock: continue - settings['handler'](csock) + if settings['multiprocess']: + handler_msg("forking handler process") + pid = os.fork() - except Exception: - print "Ignoring exception:" - print traceback.format_exc() + if pid == 0: # handler process + settings['handler'](csock) + else: # parent process + settings['handler_id'] += 1 + + except EClose, exc: + handler_msg("handler exit: %s" % exc.args) + except Exception, exc: + handler_msg("handler exception: %s" % str(exc)) + #handler_msg(traceback.format_exc()) + + if pid == 0: if csock: csock.close() if startsock and startsock != csock: startsock.close() + + if settings['multiprocess']: break # Child process exits diff --git a/utils/wsproxy.c b/utils/wsproxy.c index 77f8de5..27755de 100644 --- a/utils/wsproxy.c +++ b/utils/wsproxy.c @@ -45,6 +45,7 @@ char USAGE[] = "Usage: [options] " \ char target_host[256]; int target_port; +extern pipe_error; extern settings_t settings; extern char *tbuf, *cbuf, *tbuf_tmp, *cbuf_tmp; extern unsigned int bufsize, dbufsize; @@ -86,29 +87,32 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { } ret = select(maxfd, &rlist, &wlist, &elist, &tv); + if (pipe_error) { break; } if (FD_ISSET(target, &elist)) { - fprintf(stderr, "target exception\n"); + handler_emsg("target exception\n"); break; } if (FD_ISSET(client, &elist)) { - fprintf(stderr, "client exception\n"); + handler_emsg("client exception\n"); break; } if (ret == -1) { - error("select()"); + handler_emsg("select(): %s\n", strerror(errno)); break; } else if (ret == 0) { - //fprintf(stderr, "select timeout\n"); + //handler_emsg("select timeout\n"); continue; } if (FD_ISSET(target, &wlist)) { len = tend-tstart; bytes = send(target, tbuf + tstart, len, 0); + if (pipe_error) { break; } if (bytes < 0) { - error("target connection error"); + handler_emsg("target connection error: %s\n", + strerror(errno)); break; } tstart += bytes; @@ -123,8 +127,9 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { if (FD_ISSET(client, &wlist)) { len = cend-cstart; bytes = ws_send(ws_ctx, cbuf + cstart, len); + if (pipe_error) { break; } if (len < 3) { - fprintf(stderr, "len: %d, bytes: %d: %d\n", len, bytes, *(cbuf + cstart)); + handler_emsg("len: %d, bytes: %d: %d\n", len, bytes, *(cbuf + cstart)); } cstart += bytes; if (cstart >= cend) { @@ -137,8 +142,9 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { if (FD_ISSET(target, &rlist)) { bytes = recv(target, cbuf_tmp, dbufsize , 0); + if (pipe_error) { break; } if (bytes <= 0) { - fprintf(stderr, "target closed connection"); + handler_emsg("target closed connection\n"); break; } cstart = 0; @@ -151,7 +157,7 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { printf("\n"); */ if (cend < 0) { - fprintf(stderr, "encoding error\n"); + handler_emsg("encoding error\n"); break; } traffic("{"); @@ -159,13 +165,14 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { if (FD_ISSET(client, &rlist)) { bytes = ws_recv(ws_ctx, tbuf_tmp, bufsize-1); + if (pipe_error) { break; } if (bytes <= 0) { - fprintf(stderr, "client closed connection\n"); + handler_emsg("client closed connection\n"); break; } else if ((bytes == 2) && (tbuf_tmp[0] == '\xff') && (tbuf_tmp[1] == '\x00')) { - fprintf(stderr, "client sent orderly close frame"); + handler_emsg("client sent orderly close frame\n"); break; } /* @@ -184,7 +191,7 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { printf("\n"); */ if (len < 0) { - fprintf(stderr, "decoding error\n"); + handler_emsg("decoding error\n"); break; } traffic("}"); @@ -198,11 +205,12 @@ void proxy_handler(ws_ctx_t *ws_ctx) { int tsock = 0; struct sockaddr_in taddr; - printf("Connecting to: %s:%d\n", target_host, target_port); + handler_msg("connecting to: %s:%d\n", target_host, target_port); tsock = socket(AF_INET, SOCK_STREAM, 0); if (tsock < 0) { - error("Could not create target socket"); + handler_emsg("Could not create target socket: %s\n", + strerror(errno)); return; } bzero((char *) &taddr, sizeof(taddr)); @@ -211,16 +219,20 @@ void proxy_handler(ws_ctx_t *ws_ctx) { /* Resolve target address */ if (resolve_host(&taddr.sin_addr, target_host) < -1) { - error("Could not resolve target address"); + handler_emsg("Could not resolve target address: %s\n", + strerror(errno)); } if (connect(tsock, (struct sockaddr *) &taddr, sizeof(taddr)) < 0) { - error("Could not connect to target"); + handler_emsg("Could not connect to target: %s\n", + strerror(errno)); close(tsock); return; } - printf("%s", traffic_legend); + if ((! settings.daemon) && (! settings.multiprocess)) { + printf("%s", traffic_legend); + } do_proxy(ws_ctx, tsock); @@ -230,11 +242,12 @@ void proxy_handler(ws_ctx_t *ws_ctx) { int main(int argc, char *argv[]) { int fd, c, option_index = 0; - static int ssl_only = 0, foreground = 0; + static int ssl_only = 0, foreground = 0, multi = 0; char *found; static struct option long_options[] = { {"ssl-only", no_argument, &ssl_only, 1 }, {"foreground", no_argument, &foreground, 'f'}, + {"multiprocess", no_argument, &multi, 'm'}, /* ---- */ {"cert", required_argument, 0, 'c'}, {0, 0, 0, 0} @@ -243,7 +256,7 @@ int main(int argc, char *argv[]) settings.cert = realpath("self.pem", NULL); while (1) { - c = getopt_long (argc, argv, "fr:c:", + c = getopt_long (argc, argv, "fmc:", long_options, &option_index); /* Detect the end */ @@ -257,6 +270,9 @@ int main(int argc, char *argv[]) case 'f': foreground = 1; break; + case 'm': + multi = 1; + break; case 'r': if ((fd = open(optarg, O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < -1) { @@ -274,8 +290,9 @@ int main(int argc, char *argv[]) usage(""); } } - settings.ssl_only = ssl_only; - settings.daemon = foreground ? 0: 1; + settings.ssl_only = ssl_only; + settings.daemon = foreground ? 0: 1; + settings.multiprocess = multi; if ((argc-optind) != 2) { usage("Invalid number of arguments\n"); @@ -314,6 +331,7 @@ int main(int argc, char *argv[]) //printf(" ssl_only: %d\n", settings.ssl_only); //printf(" daemon: %d\n", settings.daemon); + //printf(" multiproces: %d\n", settings.multiprocess); //printf(" cert: %s\n", settings.cert); settings.handler = proxy_handler; diff --git a/utils/wsproxy.py b/utils/wsproxy.py index c2a7e99..dfa3dde 100755 --- a/utils/wsproxy.py +++ b/utils/wsproxy.py @@ -72,7 +72,7 @@ def do_proxy(client, target): if target in ins: buf = target.recv(buffer_size) - if len(buf) == 0: raise Exception("Target closed") + if len(buf) == 0: raise EClose("Target closed") cqueue.append(encode(buf)) traffic("{") @@ -80,10 +80,10 @@ def do_proxy(client, target): if client in ins: buf = client.recv(buffer_size) - if len(buf) == 0: raise Exception("Client closed") + if len(buf) == 0: raise EClose("Client closed") if buf == '\xff\x00': - raise Exception("Client sent orderly close frame") + raise EClose("Client sent orderly close frame") elif buf[-1] == '\xff': if buf.count('\xff') > 1: traffic(str(buf.count('\xff'))) @@ -104,15 +104,16 @@ def proxy_handler(client): global target_host, target_port, options, rec if settings['record']: - print "Opening record file: %s" % settings['record'] + handler_msg("opening record file: %s" % settings['record']) rec = open(settings['record'], 'w+') rec.write("var VNC_frame_data = [\n") - print "Connecting to: %s:%s" % (target_host, target_port) + handler_msg("connecting to: %s:%s" % (target_host, target_port)) tsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tsock.connect((target_host, target_port)) - print traffic_legend + if not settings['daemon'] and not settings['multiprocess']: + print traffic_legend try: do_proxy(client, tsock) @@ -132,6 +133,9 @@ if __name__ == '__main__': parser.add_option("--foreground", "-f", dest="daemon", default=True, action="store_false", help="stay in foreground, do not daemonize") + parser.add_option("--multiprocess", "-m", + dest="multiprocess", action="store_true", + help="fork handler processes") parser.add_option("--ssl-only", action="store_true", help="disallow non-encrypted connections") parser.add_option("--cert", default="self.pem", @@ -162,6 +166,7 @@ if __name__ == '__main__': settings['cert'] = os.path.abspath(options.cert) settings['ssl_only'] = options.ssl_only settings['daemon'] = options.daemon + settings['multiprocess'] = options.multiprocess if options.record: settings['record'] = os.path.abspath(options.record) start_server()