wsproxy: multiprocess capable.

Add -m, --multiprocess option which forks a handler for each
connection allowing multiple connections to the same target using the
same proxy instance.

Cleaned up the output of the handler process. Each process' output is
prefixed with an ordinal value.

Changed both the C and python versions of the proxy.
This commit is contained in:
Joel Martin 2010-09-10 13:05:48 -05:00
parent 465faf19db
commit a0315ab1dc
5 changed files with 172 additions and 69 deletions

View File

@ -39,13 +39,16 @@ const char policy_response[] = "<cross-domain-policy><allow-access-from domain=\
* Warning: not thread safe
*/
int ssl_initialized = 0;
int pipe_error = 0;
char *tbuf, *cbuf, *tbuf_tmp, *cbuf_tmp;
unsigned int bufsize, dbufsize;
settings_t settings;
void traffic(char * token) {
fprintf(stdout, "%s", token);
fflush(stdout);
if ((! settings.daemon) && (! settings.multiprocess)) {
fprintf(stdout, "%s", token);
fflush(stdout);
}
}
void error(char *msg)
@ -89,7 +92,7 @@ int resolve_host(struct in_addr *sin_addr, const char *hostname)
ssize_t ws_recv(ws_ctx_t *ctx, void *buf, size_t len) {
if (ctx->ssl) {
//printf("SSL recv\n");
//handler_msg("SSL recv\n");
return SSL_read(ctx->ssl, buf, len);
} else {
return recv(ctx->sockfd, buf, len, 0);
@ -98,7 +101,7 @@ ssize_t ws_recv(ws_ctx_t *ctx, void *buf, size_t len) {
ssize_t ws_send(ws_ctx_t *ctx, const void *buf, size_t len) {
if (ctx->ssl) {
//printf("SSL send\n");
//handler_msg("SSL send\n");
return SSL_write(ctx->ssl, buf, len);
} else {
return send(ctx->sockfd, buf, len, 0);
@ -202,7 +205,7 @@ int decode(char *src, size_t srclength, u_char *target, size_t targsize) {
int i, len, framecount = 0, retlen = 0;
unsigned char chr;
if ((src[0] != '\x00') || (src[srclength-1] != '\xff')) {
fprintf(stderr, "WebSocket framing error\n");
handler_emsg("WebSocket framing error\n");
return -1;
}
start = src+1; // Skip '\x00' start
@ -337,13 +340,13 @@ ws_ctx_t *do_handshake(int sock) {
len = recv(sock, handshake, 1024, MSG_PEEK);
handshake[len] = 0;
if (len == 0) {
printf("Ignoring empty handshake\n");
handler_msg("ignoring empty handshake\n");
close(sock);
return NULL;
} else if (bcmp(handshake, "<policy-file-request/>", 22) == 0) {
len = recv(sock, handshake, 1024, 0);
handshake[len] = 0;
printf("Sending flash policy response\n");
handler_msg("sending flash policy response\n");
send(sock, policy_response, sizeof(policy_response), 0);
close(sock);
return NULL;
@ -353,22 +356,22 @@ ws_ctx_t *do_handshake(int sock) {
ws_ctx = ws_socket_ssl(sock, settings.cert);
if (! ws_ctx) { return NULL; }
scheme = "wss";
printf(" using SSL socket\n");
handler_msg("using SSL socket\n");
} else if (settings.ssl_only) {
printf("Non-SSL connection disallowed\n");
handler_msg("non-SSL connection disallowed\n");
close(sock);
return NULL;
} else {
ws_ctx = ws_socket(sock);
if (! ws_ctx) { return NULL; }
scheme = "ws";
printf(" using plain (not SSL) socket\n");
handler_msg("using plain (not SSL) socket\n");
}
len = ws_recv(ws_ctx, handshake, 4096);
handshake[len] = 0;
if (!parse_handshake(handshake, &headers)) {
fprintf(stderr, "Invalid WS request\n");
handler_emsg("Invalid WS request\n");
close(sock);
return NULL;
}
@ -376,16 +379,16 @@ ws_ctx_t *do_handshake(int sock) {
if (headers.key3[0] != '\0') {
gen_md5(&headers, trailer);
pre = "Sec-";
printf(" using protocol version 76\n");
handler_msg("using protocol version 76\n");
} else {
trailer[0] = '\0';
pre = "";
printf(" using protocol version 75\n");
handler_msg("using protocol version 75\n");
}
sprintf(response, server_handshake, pre, headers.origin, pre, scheme,
headers.host, headers.path, pre, trailer);
//printf("response: %s\n", response);
//handler_msg("response: %s\n", response);
ws_send(ws_ctx, response, strlen(response));
return ws_ctx;
@ -393,7 +396,8 @@ ws_ctx_t *do_handshake(int sock) {
void signal_handler(sig) {
switch (sig) {
case SIGHUP: break; // ignore
case SIGHUP: break; // ignore for now
case SIGPIPE: pipe_error = 1; break; // handle inline
case SIGTERM: exit(0); break;
}
}
@ -434,7 +438,7 @@ void daemonize(int keepfd) {
void start_server() {
int lsock, csock, clilen, sopt = 1, i;
int lsock, csock, pid, clilen, sopt = 1, i;
struct sockaddr_in serv_addr, cli_addr;
ws_ctx_t *ws_ctx;
@ -470,18 +474,26 @@ void start_server() {
}
listen(lsock,100);
signal(SIGPIPE, signal_handler); // catch pipe
if (settings.daemon) {
daemonize(lsock);
}
if (settings.multiprocess) {
printf("Waiting for connections on %s:%d\n",
settings.listen_host, settings.listen_port);
// Reep zombies
signal(SIGCHLD, SIG_IGN);
}
while (1) {
clilen = sizeof(cli_addr);
if (settings.listen_host && settings.listen_host[0] != '\0') {
printf("waiting for connection on %s:%d\n",
pipe_error = 0;
pid = 0;
if (! settings.multiprocess) {
printf("Waiting for connection on %s:%d\n",
settings.listen_host, settings.listen_port);
} else {
printf("waiting for connection on port %d\n",
settings.listen_port);
}
csock = accept(lsock,
(struct sockaddr *) &cli_addr,
@ -490,7 +502,8 @@ void start_server() {
error("ERROR on accept");
continue;
}
printf("Got client connection from %s\n", inet_ntoa(cli_addr.sin_addr));
handler_msg("got client connection from %s\n",
inet_ntoa(cli_addr.sin_addr));
ws_ctx = do_handshake(csock);
if (ws_ctx == NULL) {
close(csock);
@ -501,8 +514,24 @@ void start_server() {
* 20 for WS '\x00' / '\xff' and good measure */
dbufsize = (bufsize * 3)/4 - 20;
settings.handler(ws_ctx);
close(csock);
if (settings.multiprocess) {
handler_msg("forking handler process\n");
pid = fork();
}
if (pid == 0) { // handler process
settings.handler(ws_ctx);
if (pipe_error) {
handler_emsg("Closing due to SIGPIPE\n");
}
close(csock);
if (settings.multiprocess) {
handler_msg("handler exit\n");
break; // Child process exits
}
} else { // parent process
settings.handler_id += 1;
}
}
}

View File

@ -10,8 +10,10 @@ typedef struct {
char listen_host[256];
int listen_port;
void (*handler)(ws_ctx_t*);
int handler_id;
int ssl_only;
int daemon;
int multiprocess;
char *record;
char *cert;
} settings_t;
@ -34,3 +36,16 @@ ssize_t ws_send(ws_ctx_t *ctx, const void *buf, size_t len);
//int b64_ntop(u_char const *src, size_t srclength, char *target, size_t targsize);
//int b64_pton(char const *src, u_char *target, size_t targsize);
#define gen_handler_msg(stream, ...) \
if (! settings.daemon) { \
if (settings.multiprocess) { \
fprintf(stream, " %d: ", settings.handler_id); \
} else { \
fprintf(stream, " "); \
} \
fprintf(stream, __VA_ARGS__); \
}
#define handler_msg(...) gen_handler_msg(stdout, __VA_ARGS__);
#define handler_emsg(...) gen_handler_msg(stderr, __VA_ARGS__);

View File

@ -25,9 +25,11 @@ settings = {
'listen_host' : '',
'listen_port' : None,
'handler' : None,
'handler_id' : 1,
'cert' : None,
'ssl_only' : False,
'daemon' : True,
'multiprocess': False,
'record' : None, }
server_handshake = """HTTP/1.1 101 Web Socket Protocol Handshake\r
@ -41,9 +43,20 @@ Connection: Upgrade\r
policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
class EClose(Exception):
pass
def traffic(token="."):
sys.stdout.write(token)
sys.stdout.flush()
if not settings['daemon'] and not settings['multiprocess']:
sys.stdout.write(token)
sys.stdout.flush()
def handler_msg(msg):
if not settings['daemon']:
if settings['multiprocess']:
print " %d: %s" % (settings['handler_id'], msg)
else:
print " %s" % msg
def encode(buf):
buf = b64encode(buf)
@ -89,14 +102,14 @@ def do_handshake(sock):
# Peek, but don't read the data
handshake = sock.recv(1024, socket.MSG_PEEK)
#print "Handshake [%s]" % repr(handshake)
#handler_msg("Handshake [%s]" % repr(handshake))
if handshake == "":
print "Ignoring empty handshake"
handler_msg("ignoring empty handshake")
sock.close()
return False
elif handshake.startswith("<policy-file-request/>"):
handshake = sock.recv(1024)
print "Sending flash policy response"
handler_msg("Sending flash policy response")
sock.send(policy_response)
sock.close()
return False
@ -107,32 +120,32 @@ def do_handshake(sock):
certfile=settings['cert'],
ssl_version=ssl.PROTOCOL_TLSv1)
scheme = "wss"
print " using SSL/TLS"
handler_msg("using SSL/TLS")
elif settings['ssl_only']:
print "Non-SSL connection disallowed"
handler_msg("non-SSL connection disallowed")
sock.close()
return False
else:
retsock = sock
scheme = "ws"
print " using plain (not SSL) socket"
handler_msg("using plain (not SSL) socket")
handshake = retsock.recv(4096)
#print "handshake: " + repr(handshake)
#handler_msg("handshake: " + repr(handshake))
h = parse_handshake(handshake)
if h.get('key3'):
trailer = gen_md5(h)
pre = "Sec-"
print " using protocol version 76"
handler_msg("using protocol version 76")
else:
trailer = ""
pre = ""
print " using protocol version 75"
handler_msg("using protocol version 75")
response = server_handshake % (pre, h['Origin'], pre, scheme,
h['Host'], h['path'], pre, trailer)
#print "sending response:", repr(response)
#handler_msg("sending response:", repr(response))
retsock.send(response)
return retsock
@ -177,21 +190,44 @@ def start_server():
lsock.bind((settings['listen_host'], settings['listen_port']))
lsock.listen(100)
if settings['daemon']: daemonize(keepfd=lsock.fileno())
if settings['daemon']:
daemonize(keepfd=lsock.fileno())
if settings['multiprocess']:
print 'Waiting for connections on %s:%s' % (
settings['listen_host'], settings['listen_port'])
# Reep zombies
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
while True:
try:
csock = startsock = None
print 'waiting for connection on port %s' % settings['listen_port']
pid = 0
if not settings['multiprocess']:
print 'Waiting for connection on %s:%s' % (
settings['listen_host'], settings['listen_port'])
startsock, address = lsock.accept()
print 'Got client connection from %s' % address[0]
handler_msg('got client connection from %s' % address[0])
csock = do_handshake(startsock)
if not csock: continue
settings['handler'](csock)
if settings['multiprocess']:
handler_msg("forking handler process")
pid = os.fork()
except Exception:
print "Ignoring exception:"
print traceback.format_exc()
if pid == 0: # handler process
settings['handler'](csock)
else: # parent process
settings['handler_id'] += 1
except EClose, exc:
handler_msg("handler exit: %s" % exc.args)
except Exception, exc:
handler_msg("handler exception: %s" % str(exc))
#handler_msg(traceback.format_exc())
if pid == 0:
if csock: csock.close()
if startsock and startsock != csock: startsock.close()
if settings['multiprocess']: break # Child process exits

View File

@ -45,6 +45,7 @@ char USAGE[] = "Usage: [options] " \
char target_host[256];
int target_port;
extern pipe_error;
extern settings_t settings;
extern char *tbuf, *cbuf, *tbuf_tmp, *cbuf_tmp;
extern unsigned int bufsize, dbufsize;
@ -86,29 +87,32 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) {
}
ret = select(maxfd, &rlist, &wlist, &elist, &tv);
if (pipe_error) { break; }
if (FD_ISSET(target, &elist)) {
fprintf(stderr, "target exception\n");
handler_emsg("target exception\n");
break;
}
if (FD_ISSET(client, &elist)) {
fprintf(stderr, "client exception\n");
handler_emsg("client exception\n");
break;
}
if (ret == -1) {
error("select()");
handler_emsg("select(): %s\n", strerror(errno));
break;
} else if (ret == 0) {
//fprintf(stderr, "select timeout\n");
//handler_emsg("select timeout\n");
continue;
}
if (FD_ISSET(target, &wlist)) {
len = tend-tstart;
bytes = send(target, tbuf + tstart, len, 0);
if (pipe_error) { break; }
if (bytes < 0) {
error("target connection error");
handler_emsg("target connection error: %s\n",
strerror(errno));
break;
}
tstart += bytes;
@ -123,8 +127,9 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) {
if (FD_ISSET(client, &wlist)) {
len = cend-cstart;
bytes = ws_send(ws_ctx, cbuf + cstart, len);
if (pipe_error) { break; }
if (len < 3) {
fprintf(stderr, "len: %d, bytes: %d: %d\n", len, bytes, *(cbuf + cstart));
handler_emsg("len: %d, bytes: %d: %d\n", len, bytes, *(cbuf + cstart));
}
cstart += bytes;
if (cstart >= cend) {
@ -137,8 +142,9 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) {
if (FD_ISSET(target, &rlist)) {
bytes = recv(target, cbuf_tmp, dbufsize , 0);
if (pipe_error) { break; }
if (bytes <= 0) {
fprintf(stderr, "target closed connection");
handler_emsg("target closed connection\n");
break;
}
cstart = 0;
@ -151,7 +157,7 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) {
printf("\n");
*/
if (cend < 0) {
fprintf(stderr, "encoding error\n");
handler_emsg("encoding error\n");
break;
}
traffic("{");
@ -159,13 +165,14 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) {
if (FD_ISSET(client, &rlist)) {
bytes = ws_recv(ws_ctx, tbuf_tmp, bufsize-1);
if (pipe_error) { break; }
if (bytes <= 0) {
fprintf(stderr, "client closed connection\n");
handler_emsg("client closed connection\n");
break;
} else if ((bytes == 2) &&
(tbuf_tmp[0] == '\xff') &&
(tbuf_tmp[1] == '\x00')) {
fprintf(stderr, "client sent orderly close frame");
handler_emsg("client sent orderly close frame\n");
break;
}
/*
@ -184,7 +191,7 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) {
printf("\n");
*/
if (len < 0) {
fprintf(stderr, "decoding error\n");
handler_emsg("decoding error\n");
break;
}
traffic("}");
@ -198,11 +205,12 @@ void proxy_handler(ws_ctx_t *ws_ctx) {
int tsock = 0;
struct sockaddr_in taddr;
printf("Connecting to: %s:%d\n", target_host, target_port);
handler_msg("connecting to: %s:%d\n", target_host, target_port);
tsock = socket(AF_INET, SOCK_STREAM, 0);
if (tsock < 0) {
error("Could not create target socket");
handler_emsg("Could not create target socket: %s\n",
strerror(errno));
return;
}
bzero((char *) &taddr, sizeof(taddr));
@ -211,16 +219,20 @@ void proxy_handler(ws_ctx_t *ws_ctx) {
/* Resolve target address */
if (resolve_host(&taddr.sin_addr, target_host) < -1) {
error("Could not resolve target address");
handler_emsg("Could not resolve target address: %s\n",
strerror(errno));
}
if (connect(tsock, (struct sockaddr *) &taddr, sizeof(taddr)) < 0) {
error("Could not connect to target");
handler_emsg("Could not connect to target: %s\n",
strerror(errno));
close(tsock);
return;
}
printf("%s", traffic_legend);
if ((! settings.daemon) && (! settings.multiprocess)) {
printf("%s", traffic_legend);
}
do_proxy(ws_ctx, tsock);
@ -230,11 +242,12 @@ void proxy_handler(ws_ctx_t *ws_ctx) {
int main(int argc, char *argv[])
{
int fd, c, option_index = 0;
static int ssl_only = 0, foreground = 0;
static int ssl_only = 0, foreground = 0, multi = 0;
char *found;
static struct option long_options[] = {
{"ssl-only", no_argument, &ssl_only, 1 },
{"foreground", no_argument, &foreground, 'f'},
{"multiprocess", no_argument, &multi, 'm'},
/* ---- */
{"cert", required_argument, 0, 'c'},
{0, 0, 0, 0}
@ -243,7 +256,7 @@ int main(int argc, char *argv[])
settings.cert = realpath("self.pem", NULL);
while (1) {
c = getopt_long (argc, argv, "fr:c:",
c = getopt_long (argc, argv, "fmc:",
long_options, &option_index);
/* Detect the end */
@ -257,6 +270,9 @@ int main(int argc, char *argv[])
case 'f':
foreground = 1;
break;
case 'm':
multi = 1;
break;
case 'r':
if ((fd = open(optarg, O_CREAT,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < -1) {
@ -274,8 +290,9 @@ int main(int argc, char *argv[])
usage("");
}
}
settings.ssl_only = ssl_only;
settings.daemon = foreground ? 0: 1;
settings.ssl_only = ssl_only;
settings.daemon = foreground ? 0: 1;
settings.multiprocess = multi;
if ((argc-optind) != 2) {
usage("Invalid number of arguments\n");
@ -314,6 +331,7 @@ int main(int argc, char *argv[])
//printf(" ssl_only: %d\n", settings.ssl_only);
//printf(" daemon: %d\n", settings.daemon);
//printf(" multiproces: %d\n", settings.multiprocess);
//printf(" cert: %s\n", settings.cert);
settings.handler = proxy_handler;

View File

@ -72,7 +72,7 @@ def do_proxy(client, target):
if target in ins:
buf = target.recv(buffer_size)
if len(buf) == 0: raise Exception("Target closed")
if len(buf) == 0: raise EClose("Target closed")
cqueue.append(encode(buf))
traffic("{")
@ -80,10 +80,10 @@ def do_proxy(client, target):
if client in ins:
buf = client.recv(buffer_size)
if len(buf) == 0: raise Exception("Client closed")
if len(buf) == 0: raise EClose("Client closed")
if buf == '\xff\x00':
raise Exception("Client sent orderly close frame")
raise EClose("Client sent orderly close frame")
elif buf[-1] == '\xff':
if buf.count('\xff') > 1:
traffic(str(buf.count('\xff')))
@ -104,15 +104,16 @@ def proxy_handler(client):
global target_host, target_port, options, rec
if settings['record']:
print "Opening record file: %s" % settings['record']
handler_msg("opening record file: %s" % settings['record'])
rec = open(settings['record'], 'w+')
rec.write("var VNC_frame_data = [\n")
print "Connecting to: %s:%s" % (target_host, target_port)
handler_msg("connecting to: %s:%s" % (target_host, target_port))
tsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tsock.connect((target_host, target_port))
print traffic_legend
if not settings['daemon'] and not settings['multiprocess']:
print traffic_legend
try:
do_proxy(client, tsock)
@ -132,6 +133,9 @@ if __name__ == '__main__':
parser.add_option("--foreground", "-f",
dest="daemon", default=True, action="store_false",
help="stay in foreground, do not daemonize")
parser.add_option("--multiprocess", "-m",
dest="multiprocess", action="store_true",
help="fork handler processes")
parser.add_option("--ssl-only", action="store_true",
help="disallow non-encrypted connections")
parser.add_option("--cert", default="self.pem",
@ -162,6 +166,7 @@ if __name__ == '__main__':
settings['cert'] = os.path.abspath(options.cert)
settings['ssl_only'] = options.ssl_only
settings['daemon'] = options.daemon
settings['multiprocess'] = options.multiprocess
if options.record:
settings['record'] = os.path.abspath(options.record)
start_server()