Cherry-picked 4e3388964a from

astrand/websockify:

    Prepare for fixing https://github.com/kanaka/websockify/issues/71:
    Move around functions and methods, so that connection-related and
    server-related stuff are close together.

    This patch just moves things around - it does not change anything at
    all. This can be verified with:

    git diff websocket.py | grep ^- | cut -c 2- | sort > removed
    git diff websocket.py | grep ^+ | cut -c 2- | sort > added
    diff -u removed added
This commit is contained in:
Peter Åstrand (astrand) 2013-12-17 13:56:20 +01:00
parent 17455afe4d
commit 8049ddfe26
1 changed files with 344 additions and 344 deletions

View File

@ -74,20 +74,14 @@ class WebSocketServer(object):
log_prefix = "websocket"
buffer_size = 65536
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
server_handshake_hybi = """HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: %s\r
"""
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
# An exception before the WebSocket connection was established
class EClose(Exception):
pass
# An exception while the WebSocket client was connected
class CClose(Exception):
pass
@ -95,6 +89,318 @@ Sec-WebSocket-Accept: %s\r
class Terminate(Exception):
pass
@staticmethod
def unmask(buf, hlen, plen):
pstart = hlen + 4
pend = pstart + plen
if numpy:
b = c = s2b('')
if plen >= 4:
mask = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
offset=hlen, count=1)
data = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
offset=pstart, count=int(plen / 4))
#b = numpy.bitwise_xor(data, mask).data
b = numpy.bitwise_xor(data, mask).tostring()
if plen % 4:
#self.msg("Partial unmask")
mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=hlen, count=(plen % 4))
data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=pend - (plen % 4),
count=(plen % 4))
c = numpy.bitwise_xor(data, mask).tostring()
return b + c
else:
# Slower fallback
mask = buf[hlen:hlen+4]
data = array.array('B')
mask = s2a(mask)
data.fromstring(buf[pstart:pend])
for i in range(len(data)):
data[i] ^= mask[i % 4]
return data.tostring()
@staticmethod
def encode_hybi(buf, opcode, base64=False):
""" Encode a HyBi style WebSocket frame.
Optional opcode:
0x0 - continuation
0x1 - text frame (base64 encode buf)
0x2 - binary frame (use raw buf)
0x8 - connection close
0x9 - ping
0xA - pong
"""
if base64:
buf = b64encode(buf)
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
payload_len = len(buf)
if payload_len <= 125:
header = pack('>BB', b1, payload_len)
elif payload_len > 125 and payload_len < 65536:
header = pack('>BBH', b1, 126, payload_len)
elif payload_len >= 65536:
header = pack('>BBQ', b1, 127, payload_len)
#self.msg("Encoded: %s", repr(header + buf))
return header + buf, len(header), 0
@staticmethod
def decode_hybi(buf, base64=False, logger=None):
""" Decode HyBi style WebSocket packets.
Returns:
{'fin' : 0_or_1,
'opcode' : number,
'masked' : boolean,
'hlen' : header_bytes_number,
'length' : payload_bytes_number,
'payload' : decoded_buffer,
'left' : bytes_left_number,
'close_code' : number,
'close_reason' : string}
"""
f = {'fin' : 0,
'opcode' : 0,
'masked' : False,
'hlen' : 2,
'length' : 0,
'payload' : None,
'left' : 0,
'close_code' : 1000,
'close_reason' : ''}
if logger is None:
logger = WebSocketServer.get_logger()
blen = len(buf)
f['left'] = blen
if blen < f['hlen']:
return f # Incomplete frame header
b1, b2 = unpack_from(">BB", buf)
f['opcode'] = b1 & 0x0f
f['fin'] = (b1 & 0x80) >> 7
f['masked'] = (b2 & 0x80) >> 7
f['length'] = b2 & 0x7f
if f['length'] == 126:
f['hlen'] = 4
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = unpack_from('>xxH', buf)
elif f['length'] == 127:
f['hlen'] = 10
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = unpack_from('>xxQ', buf)
full_len = f['hlen'] + f['masked'] * 4 + f['length']
if blen < full_len: # Incomplete frame
return f # Incomplete frame header
# Number of bytes that are part of the next frame(s)
f['left'] = blen - full_len
# Process 1 frame
if f['masked']:
# unmask payload
f['payload'] = WebSocketServer.unmask(buf, f['hlen'],
f['length'])
else:
logger.debug("Unmasked frame: %s" % repr(buf))
f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len]
if base64 and f['opcode'] in [1, 2]:
try:
f['payload'] = b64decode(f['payload'])
except:
logger.exception("Exception while b64decoding buffer: %s" %
(repr(buf)))
raise
if f['opcode'] == 0x08:
if f['length'] >= 2:
f['close_code'] = unpack_from(">H", f['payload'])[0]
if f['length'] > 3:
f['close_reason'] = f['payload'][2:]
return f
#
# Main WebSocketServer methods
#
def send_frames(self, bufs=None):
""" Encode and send WebSocket frames. Any frames already
queued will be sent first. If buf is not set then only queued
frames will be sent. Returns the number of pending frames that
could not be fully sent. If returned pending frames is greater
than 0, then the caller should call again when the socket is
ready. """
tdelta = int(time.time()*1000) - self.start_time
if bufs:
for buf in bufs:
if self.base64:
encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=1, base64=True)
else:
encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=2, base64=False)
if self.rec:
self.rec.write("%s,\n" %
repr("{%s{" % tdelta
+ encbuf[lenhead:len(encbuf)-lentail]))
self.send_parts.append(encbuf)
while self.send_parts:
# Send pending frames
buf = self.send_parts.pop(0)
sent = self.request.send(buf)
if sent == len(buf):
self.print_traffic("<")
else:
self.print_traffic("<.")
self.send_parts.insert(0, buf[sent:])
break
return len(self.send_parts)
def recv_frames(self):
""" Receive and decode WebSocket frames.
Returns:
(bufs_list, closed_string)
"""
closed = False
bufs = []
tdelta = int(time.time()*1000) - self.start_time
buf = self.request.recv(self.buffer_size)
if len(buf) == 0:
closed = {'code': 1000, 'reason': "Client closed abruptly"}
return bufs, closed
if self.recv_part:
# Add partially received frames to current read buffer
buf = self.recv_part + buf
self.recv_part = None
while buf:
frame = self.decode_hybi(buf, base64=self.base64,
logger=self.logger)
#self.msg("Received buf: %s, frame: %s", repr(buf), frame)
if frame['payload'] == None:
# Incomplete/partial frame
self.print_traffic("}.")
if frame['left'] > 0:
self.recv_part = buf[-frame['left']:]
break
else:
if frame['opcode'] == 0x8: # connection close
closed = {'code': frame['close_code'],
'reason': frame['close_reason']}
break
self.print_traffic("}")
if self.rec:
start = frame['hlen']
end = frame['hlen'] + frame['length']
if frame['masked']:
recbuf = WebSocketServer.unmask(buf, frame['hlen'],
frame['length'])
else:
recbuf = buf[frame['hlen']:frame['hlen'] +
frame['length']]
self.rec.write("%s,\n" %
repr("}%s}" % tdelta + recbuf))
bufs.append(frame['payload'])
if frame['left']:
buf = buf[-frame['left']:]
else:
buf = ''
return bufs, closed
def send_close(self, code=1000, reason=''):
""" Send a WebSocket orderly close frame. """
msg = pack(">H%ds" % len(reason), code, reason)
buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
self.request.send(buf)
def do_websocket_handshake(self, headers, path):
h = self.headers = headers
self.path = path
prot = 'WebSocket-Protocol'
protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')
ver = h.get('Sec-WebSocket-Version')
if ver:
# HyBi/IETF version of the protocol
# HyBi-07 report version 7
# HyBi-08 - HyBi-12 report version 8
# HyBi-13 reports version 13
if ver in ['7', '8', '13']:
self.version = "hybi-%02d" % int(ver)
else:
raise self.EClose('Unsupported protocol version %s' % ver)
key = h['Sec-WebSocket-Key']
# Choose binary if client supports it
if 'binary' in protocols:
self.base64 = False
elif 'base64' in protocols:
self.base64 = True
else:
raise self.EClose("Client must support 'binary' or 'base64' protocol")
# Generate the hash value for the accept header
accept = b64encode(sha1(s2b(key + self.GUID)).digest())
response = self.server_handshake_hybi % b2s(accept)
if self.base64:
response += "Sec-WebSocket-Protocol: base64\r\n"
else:
response += "Sec-WebSocket-Protocol: binary\r\n"
response += "\r\n"
else:
raise self.EClose("Missing Sec-WebSocket-Version header. Hixie protocols not supported.")
return response
def new_websocket_client(self):
""" Do something with a WebSockets client connection. """
raise("WebSocketServer.new_websocket_client() must be overloaded")
policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
# An exception before the WebSocket connection was established
class EClose(Exception):
pass
def __init__(self, listen_host='', listen_port=None, source_is_ipv6=False,
verbose=False, cert='', key='', ssl_only=None,
daemon=False, record='', web='',
@ -270,338 +576,6 @@ Sec-WebSocket-Accept: %s\r
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
@staticmethod
def unmask(buf, hlen, plen):
pstart = hlen + 4
pend = pstart + plen
if numpy:
b = c = s2b('')
if plen >= 4:
mask = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
offset=hlen, count=1)
data = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
offset=pstart, count=int(plen / 4))
#b = numpy.bitwise_xor(data, mask).data
b = numpy.bitwise_xor(data, mask).tostring()
if plen % 4:
#self.msg("Partial unmask")
mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=hlen, count=(plen % 4))
data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=pend - (plen % 4),
count=(plen % 4))
c = numpy.bitwise_xor(data, mask).tostring()
return b + c
else:
# Slower fallback
mask = buf[hlen:hlen+4]
data = array.array('B')
mask = s2a(mask)
data.fromstring(buf[pstart:pend])
for i in range(len(data)):
data[i] ^= mask[i % 4]
return data.tostring()
@staticmethod
def encode_hybi(buf, opcode, base64=False):
""" Encode a HyBi style WebSocket frame.
Optional opcode:
0x0 - continuation
0x1 - text frame (base64 encode buf)
0x2 - binary frame (use raw buf)
0x8 - connection close
0x9 - ping
0xA - pong
"""
if base64:
buf = b64encode(buf)
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
payload_len = len(buf)
if payload_len <= 125:
header = pack('>BB', b1, payload_len)
elif payload_len > 125 and payload_len < 65536:
header = pack('>BBH', b1, 126, payload_len)
elif payload_len >= 65536:
header = pack('>BBQ', b1, 127, payload_len)
#self.msg("Encoded: %s", repr(header + buf))
return header + buf, len(header), 0
@staticmethod
def decode_hybi(buf, base64=False, logger=None):
""" Decode HyBi style WebSocket packets.
Returns:
{'fin' : 0_or_1,
'opcode' : number,
'masked' : boolean,
'hlen' : header_bytes_number,
'length' : payload_bytes_number,
'payload' : decoded_buffer,
'left' : bytes_left_number,
'close_code' : number,
'close_reason' : string}
"""
f = {'fin' : 0,
'opcode' : 0,
'masked' : False,
'hlen' : 2,
'length' : 0,
'payload' : None,
'left' : 0,
'close_code' : 1000,
'close_reason' : ''}
if logger is None:
logger = WebSocketServer.get_logger()
blen = len(buf)
f['left'] = blen
if blen < f['hlen']:
return f # Incomplete frame header
b1, b2 = unpack_from(">BB", buf)
f['opcode'] = b1 & 0x0f
f['fin'] = (b1 & 0x80) >> 7
f['masked'] = (b2 & 0x80) >> 7
f['length'] = b2 & 0x7f
if f['length'] == 126:
f['hlen'] = 4
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = unpack_from('>xxH', buf)
elif f['length'] == 127:
f['hlen'] = 10
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = unpack_from('>xxQ', buf)
full_len = f['hlen'] + f['masked'] * 4 + f['length']
if blen < full_len: # Incomplete frame
return f # Incomplete frame header
# Number of bytes that are part of the next frame(s)
f['left'] = blen - full_len
# Process 1 frame
if f['masked']:
# unmask payload
f['payload'] = WebSocketServer.unmask(buf, f['hlen'],
f['length'])
else:
logger.debug("Unmasked frame: %s" % repr(buf))
f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len]
if base64 and f['opcode'] in [1, 2]:
try:
f['payload'] = b64decode(f['payload'])
except:
logger.exception("Exception while b64decoding buffer: %s" %
(repr(buf)))
raise
if f['opcode'] == 0x08:
if f['length'] >= 2:
f['close_code'] = unpack_from(">H", f['payload'])[0]
if f['length'] > 3:
f['close_reason'] = f['payload'][2:]
return f
#
# WebSocketServer logging/output functions
#
def print_traffic(self, token="."):
""" Show traffic flow mode. """
if self.traffic:
sys.stdout.write(token)
sys.stdout.flush()
def log(self, lvl, msg, *args, **kwargs):
""" Wrapper around python logging """
prefix = ""
if self.i_am_client:
prefix = "% 3d: " % self.handler_id
self.logger.log(lvl, "%s%s" % (prefix, msg),
*args, **kwargs)
def msg(self, *args, **kwargs):
""" Output message with handler_id prefix. """
self.log(logging.INFO, *args, **kwargs)
def vmsg(self, *args, **kwargs):
""" Same as msg() but as debug. """
self.log(logging.DEBUG, *args, **kwargs)
def warn(self, *args, **kwargs):
""" Same as msg() but as warning. """
self.log(logging.WARN, *args, **kwargs)
#
# Main WebSocketServer methods
#
def send_frames(self, bufs=None):
""" Encode and send WebSocket frames. Any frames already
queued will be sent first. If buf is not set then only queued
frames will be sent. Returns the number of pending frames that
could not be fully sent. If returned pending frames is greater
than 0, then the caller should call again when the socket is
ready. """
tdelta = int(time.time()*1000) - self.start_time
if bufs:
for buf in bufs:
if self.base64:
encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=1, base64=True)
else:
encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=2, base64=False)
if self.rec:
self.rec.write("%s,\n" %
repr("{%s{" % tdelta
+ encbuf[lenhead:len(encbuf)-lentail]))
self.send_parts.append(encbuf)
while self.send_parts:
# Send pending frames
buf = self.send_parts.pop(0)
sent = self.request.send(buf)
if sent == len(buf):
self.print_traffic("<")
else:
self.print_traffic("<.")
self.send_parts.insert(0, buf[sent:])
break
return len(self.send_parts)
def recv_frames(self):
""" Receive and decode WebSocket frames.
Returns:
(bufs_list, closed_string)
"""
closed = False
bufs = []
tdelta = int(time.time()*1000) - self.start_time
buf = self.request.recv(self.buffer_size)
if len(buf) == 0:
closed = {'code': 1000, 'reason': "Client closed abruptly"}
return bufs, closed
if self.recv_part:
# Add partially received frames to current read buffer
buf = self.recv_part + buf
self.recv_part = None
while buf:
frame = self.decode_hybi(buf, base64=self.base64,
logger=self.logger)
#self.msg("Received buf: %s, frame: %s", repr(buf), frame)
if frame['payload'] == None:
# Incomplete/partial frame
self.print_traffic("}.")
if frame['left'] > 0:
self.recv_part = buf[-frame['left']:]
break
else:
if frame['opcode'] == 0x8: # connection close
closed = {'code': frame['close_code'],
'reason': frame['close_reason']}
break
self.print_traffic("}")
if self.rec:
start = frame['hlen']
end = frame['hlen'] + frame['length']
if frame['masked']:
recbuf = WebSocketServer.unmask(buf, frame['hlen'],
frame['length'])
else:
recbuf = buf[frame['hlen']:frame['hlen'] +
frame['length']]
self.rec.write("%s,\n" %
repr("}%s}" % tdelta + recbuf))
bufs.append(frame['payload'])
if frame['left']:
buf = buf[-frame['left']:]
else:
buf = ''
return bufs, closed
def send_close(self, code=1000, reason=''):
""" Send a WebSocket orderly close frame. """
msg = pack(">H%ds" % len(reason), code, reason)
buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
self.request.send(buf)
def do_websocket_handshake(self, headers, path):
h = self.headers = headers
self.path = path
prot = 'WebSocket-Protocol'
protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')
ver = h.get('Sec-WebSocket-Version')
if ver:
# HyBi/IETF version of the protocol
# HyBi-07 report version 7
# HyBi-08 - HyBi-12 report version 8
# HyBi-13 reports version 13
if ver in ['7', '8', '13']:
self.version = "hybi-%02d" % int(ver)
else:
raise self.EClose('Unsupported protocol version %s' % ver)
key = h['Sec-WebSocket-Key']
# Choose binary if client supports it
if 'binary' in protocols:
self.base64 = False
elif 'base64' in protocols:
self.base64 = True
else:
raise self.EClose("Client must support 'binary' or 'base64' protocol")
# Generate the hash value for the accept header
accept = b64encode(sha1(s2b(key + self.GUID)).digest())
response = self.server_handshake_hybi % b2s(accept)
if self.base64:
response += "Sec-WebSocket-Protocol: base64\r\n"
else:
response += "Sec-WebSocket-Protocol: binary\r\n"
response += "\r\n"
else:
raise self.EClose("Missing Sec-WebSocket-Version header. Hixie protocols not supported.")
return response
def do_handshake(self, sock, address):
"""
do_handshake does the following:
@ -704,6 +678,36 @@ Sec-WebSocket-Accept: %s\r
# Return the WebSockets socket which may be SSL wrapped
return retsock
#
# WebSocketServer logging/output functions
#
def print_traffic(self, token="."):
""" Show traffic flow mode. """
if self.traffic:
sys.stdout.write(token)
sys.stdout.flush()
def log(self, lvl, msg, *args, **kwargs):
""" Wrapper around python logging """
prefix = ""
if self.i_am_client:
prefix = "% 3d: " % self.handler_id
self.logger.log(lvl, "%s%s" % (prefix, msg),
*args, **kwargs)
def msg(self, *args, **kwargs):
""" Output message with handler_id prefix. """
self.log(logging.INFO, *args, **kwargs)
def vmsg(self, *args, **kwargs):
""" Same as msg() but as debug. """
self.log(logging.DEBUG, *args, **kwargs)
def warn(self, *args, **kwargs):
""" Same as msg() but as warning. """
self.log(logging.WARN, *args, **kwargs)
#
# Events that can/should be overridden in sub-classes
@ -797,10 +801,6 @@ Sec-WebSocket-Accept: %s\r
# Original socket closed by caller
self.request.close()
def new_websocket_client(self):
""" Do something with a WebSockets client connection. """
raise("WebSocketServer.new_websocket_client() must be overloaded")
def start_server(self):
"""
Daemonize if requested. Listen for for connections. Run