Fix recording, generic and part of websocket.py.

WebSocketServer(...,  record='FILE_PREFIX')

The reocrd parameter will turn on recording of all messages sent
to and from the client. The record parameter is a file prefix. The
full file-name will be the prefix with an extension '.HANDLER_ID'
based on the handler ID.

Recording required some restructing of the encode and decode function
to return more information so that the recording functions can record
just the payload data and ignore the WebSockets framing/headers.

Caveats:
- Not all messages recorded as sent to the client were necessarily
  received by the client. For example, if several messages are queued
  for the client, but the connection is shutdown before the messages
  are actually sent, these queued messages will still appear in the
  recording.
- If the server is also handling HTTP requests then the handler ID
  extensions for the recorded files will be monotonic but not
  contiguous because only WebSocket connections are recorded, not HTTP
  requests.
This commit is contained in:
Joel Martin 2011-06-26 13:55:52 -05:00
parent d91d89167f
commit 7f487fdbd5
3 changed files with 113 additions and 81 deletions

View File

@ -59,7 +59,9 @@ var api = {}, // Public API
'open' : function() {},
'close' : function() {},
'error' : function() {}
};
},
test_mode = false;
//
@ -253,9 +255,13 @@ function init() {
function open(uri) {
init();
websocket = new WebSocket(uri, 'base64');
// TODO: future native binary support
//websocket = new WebSocket(uri, ['binary', 'base64']);
if (test_mode) {
websocket = {};
} else {
websocket = new WebSocket(uri, 'base64');
// TODO: future native binary support
//websocket = new WebSocket(uri, ['binary', 'base64']);
}
websocket.onmessage = recv_message;
websocket.onopen = function() {
@ -289,6 +295,15 @@ function close() {
}
}
// Override internal functions for testing
// Takes a send function, returns reference to recv function
function testMode(override_send) {
test_mode = true;
api.send = override_send;
api.close = function () {};
return recv_message;
}
function constructor() {
// Configuration settings
api.maxBufferedAmount = 200;
@ -319,6 +334,7 @@ function constructor() {
api.init = init;
api.open = open;
api.close = close;
api.testMode = testMode;
return api;
}

View File

@ -16,7 +16,7 @@ as taken from http://docs.python.org/dev/library/ssl.html#certificates
'''
import os, sys, errno, signal, socket, struct, traceback, select
import os, sys, time, errno, signal, socket, struct, traceback, select
from cgi import parse_qsl
from base64 import b64encode, b64decode
@ -136,6 +136,8 @@ Sec-WebSocket-Accept: %s\r
print(" - No SSL/TLS support (no 'ssl' module)")
if self.daemon:
print(" - Backgrounding (daemon)")
if self.record:
print(" - Recording to '%s.*'" % self.record)
#
# WebSocketServer static methods
@ -201,7 +203,7 @@ Sec-WebSocket-Accept: %s\r
#print("Encoded: %s" % repr(header + buf))
return header + buf
return header + buf, len(header), 0
@staticmethod
def decode_hybi(buf, base64=False):
@ -210,6 +212,7 @@ Sec-WebSocket-Accept: %s\r
{'fin' : 0_or_1,
'opcode' : number,
'mask' : 32_bit_number,
'hlen' : header_bytes_number,
'length' : payload_bytes_number,
'payload' : decoded_buffer,
'left' : bytes_left_number,
@ -217,98 +220,100 @@ Sec-WebSocket-Accept: %s\r
'close_reason' : string}
"""
ret = {'fin' : 0,
'opcode' : 0,
'mask' : 0,
'length' : 0,
'payload' : None,
'left' : 0,
'close_code' : None,
'close_reason' : None}
f = {'fin' : 0,
'opcode' : 0,
'mask' : 0,
'hlen' : 2,
'length' : 0,
'payload' : None,
'left' : 0,
'close_code' : None,
'close_reason' : None}
blen = len(buf)
ret['left'] = blen
header_len = 2
f['left'] = blen
if blen < header_len:
return ret # Incomplete frame header
if blen < f['hlen']:
return f # Incomplete frame header
b1, b2 = struct.unpack_from(">BB", buf)
ret['opcode'] = b1 & 0x0f
ret['fin'] = (b1 & 0x80) >> 7
f['opcode'] = b1 & 0x0f
f['fin'] = (b1 & 0x80) >> 7
has_mask = (b2 & 0x80) >> 7
ret['length'] = b2 & 0x7f
f['length'] = b2 & 0x7f
if ret['length'] == 126:
header_len = 4
if blen < header_len:
return ret # Incomplete frame header
(ret['length'],) = struct.unpack_from('>xxH', buf)
elif ret['length'] == 127:
header_len = 10
if blen < header_len:
return ret # Incomplete frame header
(ret['length'],) = struct.unpack_from('>xxQ', buf)
if f['length'] == 126:
f['hlen'] = 4
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = struct.unpack_from('>xxH', buf)
elif f['length'] == 127:
f['hlen'] = 10
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = struct.unpack_from('>xxQ', buf)
full_len = header_len + has_mask * 4 + ret['length']
full_len = f['hlen'] + has_mask * 4 + f['length']
if blen < full_len: # Incomplete frame
return ret # Incomplete frame header
return f # Incomplete frame header
# Number of bytes that are part of the next frame(s)
ret['left'] = blen - full_len
f['left'] = blen - full_len
# Process 1 frame
if has_mask:
# unmask payload
ret['mask'] = buf[header_len:header_len+4]
f['mask'] = buf[f['hlen']:f['hlen']+4]
b = c = ''
if ret['length'] >= 4:
if f['length'] >= 4:
mask = numpy.frombuffer(buf, dtype=numpy.dtype('<L4'),
offset=header_len, count=1)
offset=f['hlen'], count=1)
data = numpy.frombuffer(buf, dtype=numpy.dtype('<L4'),
offset=header_len + 4, count=int(ret['length'] / 4))
offset=f['hlen'] + 4, count=int(f['length'] / 4))
#b = numpy.bitwise_xor(data, mask).data
b = numpy.bitwise_xor(data, mask).tostring()
if ret['length'] % 4:
if f['length'] % 4:
print("Partial unmask")
mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=header_len, count=(ret['length'] % 4))
offset=f['hlen'], count=(f['length'] % 4))
data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=full_len - (ret['length'] % 4),
count=(ret['length'] % 4))
offset=full_len - (f['length'] % 4),
count=(f['length'] % 4))
c = numpy.bitwise_xor(data, mask).tostring()
ret['payload'] = b + c
f['payload'] = b + c
else:
print("Unmasked frame: %s" % repr(buf))
ret['payload'] = buf[(header_len + has_mask * 4):full_len]
f['payload'] = buf[(f['hlen'] + has_mask * 4):full_len]
if base64 and ret['opcode'] in [1, 2]:
if base64 and f['opcode'] in [1, 2]:
try:
ret['payload'] = b64decode(ret['payload'])
f['payload'] = b64decode(f['payload'])
except:
print("Exception while b64decoding buffer: %s" %
repr(buf))
raise
if ret['opcode'] == 0x08:
if ret['length'] >= 2:
ret['close_code'] = struct.unpack_from(">H", ret['payload'])
if ret['length'] > 3:
ret['close_reason'] = ret['payload'][2:]
if f['opcode'] == 0x08:
if f['length'] >= 2:
f['close_code'] = struct.unpack_from(">H", f['payload'])
if f['length'] > 3:
f['close_reason'] = f['payload'][2:]
return ret
return f
@staticmethod
def encode_hixie(buf):
return s2b("\x00" + b2s(b64encode(buf)) + "\xff")
return s2b("\x00" + b2s(b64encode(buf)) + "\xff"), 1, 1
@staticmethod
def decode_hixie(buf):
end = buf.find(s2b('\xff'))
return {'payload': b64decode(buf[1:end]),
'hlen': 1,
'length': end - 1,
'left': len(buf) - (end + 1)}
@ -357,17 +362,27 @@ Sec-WebSocket-Accept: %s\r
than 0, then the caller should call again when the socket is
ready. """
tdelta = int(time.time()*1000) - self.start_time
if bufs:
for buf in bufs:
if self.version.startswith("hybi"):
if self.base64:
self.send_parts.append(self.encode_hybi(buf,
opcode=1, base64=True))
encbuf, lenhead, lentail = self.encode_hybi(
buf, opcode=1, base64=True)
else:
self.send_parts.append(self.encode_hybi(buf,
opcode=2, base64=False))
encbuf, lenhead, lentail = self.encode_hybi(
buf, opcode=2, base64=False)
else:
self.send_parts.append(self.encode_hixie(buf))
encbuf, lenhead, lentail = self.encode_hixie(buf)
if self.rec:
self.rec.write("%s,\n" %
repr("{%s{" % tdelta
+ encbuf[lenhead:-lentail]))
self.send_parts.append(encbuf)
while self.send_parts:
# Send pending frames
@ -392,6 +407,7 @@ Sec-WebSocket-Accept: %s\r
closed = False
bufs = []
tdelta = int(time.time()*1000) - self.start_time
buf = self.client.recv(self.buffer_size)
if len(buf) == 0:
@ -441,6 +457,13 @@ Sec-WebSocket-Accept: %s\r
self.traffic("}")
if self.rec:
start = frame['hlen']
end = frame['hlen'] + frame['length']
self.rec.write("%s,\n" %
repr("}%s}" % tdelta + buf[start:end]))
bufs.append(frame['payload'])
if frame['left']:
@ -655,11 +678,22 @@ Sec-WebSocket-Accept: %s\r
self.send_parts = []
self.recv_part = None
self.base64 = False
self.rec = None
self.start_time = int(time.time()*1000)
# handler process
try:
try:
self.client = self.do_handshake(startsock, address)
if self.record:
# Record raw frame data as JavaScript array
fname = "%s.%s" % (self.record,
self.handler_id)
self.msg("opening record file: %s" % fname)
self.rec = open(fname, 'w+')
self.rec.write("var VNC_frame_data = [\n")
self.new_client()
except self.EClose:
_, exc, _ = sys.exc_info()
@ -672,6 +706,10 @@ Sec-WebSocket-Accept: %s\r
if self.verbose:
self.msg(traceback.format_exc())
finally:
if self.rec:
self.rec.write("'EOF']\n")
self.rec.close()
if self.client and self.client != startsock:
self.client.close()

View File

@ -138,15 +138,6 @@ Traffic Legend:
Called after a new WebSocket connection has been established.
"""
self.rec = None
if self.record:
# Record raw frame data as a JavaScript compatible file
fname = "%s.%s" % (self.record,
self.handler_id)
self.msg("opening record file: %s" % fname)
self.rec = open(fname, 'w+')
self.rec.write("var VNC_frame_data = [\n")
# Connect to the target
self.msg("connecting to: %s:%s" % (
self.target_host, self.target_port))
@ -165,9 +156,6 @@ Traffic Legend:
tsock.close()
self.vmsg("%s:%s: Target closed" %(
self.target_host, self.target_port))
if self.rec:
self.rec.write("'EOF']\n")
self.rec.close()
raise
def do_proxy(self, target):
@ -178,11 +166,9 @@ Traffic Legend:
c_pend = 0
tqueue = []
rlist = [self.client, target]
tstart = int(time.time()*1000)
while True:
wlist = []
tdelta = int(time.time()*1000) - tstart
if tqueue: wlist.append(target)
if cqueue or c_pend: wlist.append(self.client)
@ -213,11 +199,8 @@ Traffic Legend:
if self.client in outs:
# Send queued target data to the client
c_pend = self.send_frames(cqueue)
cqueue = []
#if self.rec:
# self.rec.write("%s,\n" %
# repr("{%s{" % tdelta + dat[1:-1]))
cqueue = []
if self.client in ins:
@ -225,11 +208,6 @@ Traffic Legend:
bufs, closed = self.recv_frames()
tqueue.extend(bufs)
#if self.rec:
# for b in bufs:
# self.rec.write(
# repr("}%s}%s" % (tdelta, b)) + ",\n")
if closed:
# TODO: What about blocking on client socket?
self.send_close()