HyBi and threading support to ruby websockify.

This commit is contained in:
Joel Martin 2011-10-05 17:53:36 -05:00
parent 6b9d6c39be
commit acacb4527d
3 changed files with 341 additions and 129 deletions

View File

@ -11,53 +11,91 @@
require 'gserver'
require 'stringio'
require 'digest/md5'
require 'digest/sha1'
require 'base64'
class EClose < Exception
end
class WebSocketServer < GServer
@@buffer_size = 65536
@@Buffer_size = 65536
@@server_handshake_hixie = "HTTP/1.1 101 Web Socket Protocol Handshake\r
#
# WebSocket constants
#
@@Server_handshake_hixie = "HTTP/1.1 101 Web Socket Protocol Handshake\r
Upgrade: WebSocket\r
Connection: Upgrade\r
%sWebSocket-Origin: %s\r
%sWebSocket-Location: %s://%s%s\r
"
def initialize(port, host, opts, *args)
vmsg "in WebSocketServer.initialize"
@@Server_handshake_hybi = "HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: %s\r
"
@@GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
super(port, host, *args)
def initialize(opts)
vmsg "in WebSocketServer.initialize"
port = opts['listen_port']
host = opts['listen_host'] || GServer::DEFAULT_HOST
super(port, host)
@@client_id = 0 # Track client number total on class
@verbose = opts['verbose']
@opts = opts
# Keep an overall record of the client IDs allocated
# and the lines of chat
@@client_id = 0
end
def serve(io)
@@client_id += 1
# Initialize per thread state
t = Thread.current
t[:my_client_id] = @@client_id
t[:send_parts] = []
t[:recv_part] = nil
t[:base64] = nil
puts "in serve, client: #{t[:client].inspect}"
begin
t[:client] = do_handshake(io)
new_client(t[:client])
rescue EClose => e
msg "Client closed: #{e.message}"
return
rescue Exception => e
msg "Uncaught exception: #{e.message}"
msg "Trace: #{e.backtrace}"
return
end
msg "Client disconnected"
end
#
# WebSocketServer logging/output functions
#
def traffic(token)
if @verbose
print token
STDOUT.flush
end
if @verbose then print token; STDOUT.flush; end
end
def msg(msg)
puts "% 3d: %s" % [@my_client_id, msg]
puts "% 3d: %s" % [Thread.current[:my_client_id], msg]
end
def vmsg(msg)
if @verbose
msg(msg)
end
if @verbose then msg(msg) end
end
#
# WebSocketServer general support routines
#
def gen_md5(h)
key1 = h['sec-websocket-key1']
key2 = h['sec-websocket-key2']
@ -70,6 +108,99 @@ Connection: Upgrade\r
return Digest::MD5.digest([num1, num2, key3].pack('NNa8'))
end
def unmask(buf, hlen, length)
pstart = hlen + 4
mask = buf[hlen...hlen+4]
data = buf[pstart...pstart+length]
#data = data.bytes.zip(mask.bytes.cycle(length)).map { |d,m| d^m }
for i in (0...data.length) do
data[i] ^= mask[i%4]
end
return data
end
def encode_hybi(buf, opcode, base64=false)
if base64
buf = Base64.encode64(buf).gsub(/\n/, '')
end
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
payload_len = buf.length
if payload_len <= 125
header = [b1, payload_len].pack('CC')
elsif payload_len > 125 && payload_len < 65536
header = [b1, 126, payload_len].pack('CCn')
elsif payload_len >= 65536
header = [b1, 127, payload_len >> 32,
payload_len & 0xffffffff].pack('CCNN')
end
return [header + buf, header.length, 0]
end
def decode_hybi(buf, base64=false)
f = {'fin' => 0,
'opcode' => 0,
'hlen' => 2,
'length' => 0,
'payload' => nil,
'left' => 0,
'close_code' => nil,
'close_reason' => nil}
blen = buf.length
f['left'] = blen
if blen < f['hlen'] then return f end # incomplete frame
b1, b2 = buf.unpack('CC')
f['opcode'] = b1 & 0x0f
f['fin'] = (b1 & 0x80) >> 7
has_mask = (b2 & 0x80) >> 7
f['length'] = b2 & 0x7f
if f['length'] == 126
f['hlen'] = 4
if blen < f['hlen'] then return f end # incomplete frame
f['length'] = buf.unpack('xxn')[0]
elsif f['length'] == 127
f['hlen'] = 10
if blen < f['hlen'] then return f end # incomplete frame
top, bottom = buf.unpack('xxNN')
f['length'] = (top << 32) & bottom
end
full_len = f['hlen'] + has_mask * 4 + f['length']
if blen < full_len then return f end # incomplete frame
# number of bytes that are part of the next frame(s)
f['left'] = blen - full_len
if has_mask > 0
f['payload'] = unmask(buf, f['hlen'], f['length'])
else
f['payload'] = buf[f['hlen']...full_len]
end
if base64 and [1, 2].include?(f['opcode'])
f['payload'] = Base64.decode64(f['payload'])
end
# close frame
if f['opcode'] == 0x08
if f['length'] >= 2
f['close_code'] = f['payload'].unpack('n')
end
if f['length'] > 3
f['close_reason'] = f['payload'][2...f['payload'].length]
end
end
return f
end
def encode_hixie(buf)
return ["\x00" + Base64.encode64(buf).gsub(/\n/, '') + "\xff", 1, 1]
end
@ -83,61 +214,94 @@ Connection: Upgrade\r
end
def send_frames(bufs)
t = Thread.current
if bufs.length > 0
encbuf = ""
bufs.each do |buf|
#puts "Sending frame: #{buf.inspect}"
encbuf, lenhead, lentail = encode_hixie(buf)
if t[:version].start_with?("hybi")
if t[:base64]
encbuf, lenhead, lentail = encode_hybi(
buf, opcode=1, base64=true)
else
encbuf, lenhead, lentail = encode_hybi(
buf, opcode=2, base64=false)
end
else
encbuf, lenhead, lentail = encode_hixie(buf)
end
@send_parts << encbuf
t[:send_parts] << encbuf
end
end
while @send_parts.length > 0
buf = @send_parts.shift
sent = @client.send(buf, 0)
while t[:send_parts].length > 0
buf = t[:send_parts].shift
sent = t[:client].send(buf, 0)
if sent == buf.length
traffic "<"
else
traffic "<."
@send_parts.unshift(buf[sent...buf.length])
t[:send_parts].unshift(buf[sent...buf.length])
end
end
return @send_parts.length
return t[:send_parts].length
end
# Receive and decode Websocket frames
# Returns: [bufs_list, closed_string]
def recv_frames()
t = Thread.current
closed = false
bufs = []
buf = @client.recv(@@buffer_size)
buf = t[:client].recv(@@Buffer_size)
if buf.length == 0
return bufs, "Client closed abrubtly"
end
if @recv_part
buf = @recv_part + buf
@recv_part = nil
if t[:recv_part]
buf = t[:recv_part] + buf
t[:recv_part] = nil
end
while buf.length > 0
if buf[0...2] == "\xff\x00":
closed = "Client sent orderly close frame"
break
elsif buf[0...2] == "\x00\xff":
# Partial frame
traffic "}."
@recv_part = buf
break
if t[:version].start_with?("hybi")
frame = decode_hybi(buf, base64=t[:base64])
if frame['payload'] == nil
traffic "}."
if frame['left'] > 0
t[:recv_part] = buf[-frame['left']...buf.length]
end
break
else
if frame['opcode'] == 0x8
closed = "Client closed, reason: %s - %s" % [
frame['close_code'], frame['close_reason']]
break
end
end
else
if buf[0...2] == "\xff\x00":
closed = "Client sent orderly close frame"
break
elsif buf[0...2] == "\x00\xff":
buf = buf[2...buf.length]
continue # No-op frame
elsif buf.count("\xff") == 0
# Partial frame
traffic "}."
t[:recv_part] = buf
break
end
frame = decode_hixie(buf)
end
frame = decode_hixie(buf)
#msg "Receive frame: #{frame.inspect}"
traffic "}"
@ -145,7 +309,7 @@ Connection: Upgrade\r
bufs << frame['payload']
if frame['left'] > 0:
buf = buf[buf.length-frame['left']...buf.length]
buf = buf[-frame['left']...buf.length]
else
buf = ''
end
@ -156,31 +320,46 @@ Connection: Upgrade\r
def send_close(code=nil, reason='')
buf = "\xff\x00"
@client.send(buf, 0)
t = Thread.current
if t[:version].start_with?("hybi")
msg = ''
if code
msg = [reason.length, code].pack("na8")
end
buf, lenh, lent = encode_hybi(msg, opcode=0x08, base64=false)
t[:client].send(buf, 0)
elsif t[:version] == "hixie-76"
buf = "\xff\x00"
t[:client].send(buf, 0)
end
end
def do_handshake(sock)
t = Thread.current
stype = ""
if !IO.select([sock], nil, nil, 3)
raise EClose, "ignoring socket not ready"
end
handshake = sock.recv(1024, Socket::MSG_PEEK)
#puts "Handshake [#{handshake.inspect}]"
#msg "Handshake [#{handshake.inspect}]"
if handshake == ""
raise(EClose, "ignoring empty handshake")
else
stype = "Plain non-SSL (ws://)"
scheme = "ws"
retsock = sock
sock.recv(1024)
end
h = @headers = {}
h = t[:headers] = {}
hlines = handshake.split("\r\n")
req_split = hlines.shift.match(/^(\w+) (\/[^\s]*) HTTP\/1\.1$/)
@path = req_split[2].strip
t[:path] = req_split[2].strip
hlines.each do |hline|
break if hline == ""
hsplit = hline.match(/^([^:]+):\s*(.+)$/)
@ -188,63 +367,90 @@ Connection: Upgrade\r
end
#puts "Headers: #{h.inspect}"
if h.has_key?('upgrade') &&
unless h.has_key?('upgrade') &&
h['upgrade'].downcase == 'websocket'
msg "Got WebSocket connection"
else
raise EClose, "Non-WebSocket connection"
end
body = handshake.match(/\r\n\r\n(........)/)
if body
h['key3'] = body[1]
trailer = gen_md5(h)
pre = "Sec-"
protocols = h["sec-websocket-protocol"]
protocols = h.fetch("sec-websocket-protocol", h["websocket-protocol"])
ver = h.fetch('sec-websocket-version', nil)
if ver
# HyBi/IETF vesrion of the protocol
# HyBi 07 reports version 7
# HyBi 08 - 12 report version 8
# HyBi 13 and up report version 13
if ['7', '8', '13'].include?(ver)
t[:version] = "hybi-%02d" % [ver.to_i]
else
raise EClose, "Unsupported protocol version %s" % [ver]
end
# choose binary if client supports it
if protocols.include?('binary')
t[:base64] = false
elsif protocols.include?('base64')
t[:base64] = true
else
raise EClose, "Client must support 'binary' or 'base64' sub-protocol"
end
key = h['sec-websocket-key']
# Generate the hash value for the accpet header
accept = Base64.encode64(
Digest::SHA1.digest(key + @@GUID)).gsub(/\n/, '')
response = @@Server_handshake_hybi % [accept]
if t[:base64]
response += "Sec-WebSocket-Protocol: base64\r\n"
else
response += "Sec-WebSocket-Protocol: binary\r\n"
end
response += "\r\n"
else
raise EClose, "Only Hixie-76 supported for now"
# Hixie vesrion of the protocol (75 or 76)
body = handshake.match(/\r\n\r\n(........)/)
if body
h['key3'] = body[1]
trailer = gen_md5(h)
pre = "Sec-"
t[:version] = "hixie-76"
else
trailer = ""
pre = ""
t[:version] = "hixie-75"
end
# base64 required for Hixie since payload is only UTF-8
t[:base64] = true
response = @@Server_handshake_hixie % [pre, h['origin'], pre,
"ws", h['host'], t[:path]]
if protocols && protocols.include?('base64')
response += "%sWebSocket-Protocol: base64\r\n" % [pre]
else
msg "Warning: client does not report 'base64' protocol support"
end
response += "\r\n" + trailer
end
response = sprintf(@@server_handshake_hixie, pre, h['origin'],
pre, "ws", h['host'], @path)
if protocols.include?('base64')
response += sprintf("%sWebSocket-Protocol: base64\r\n", pre)
else
msg "Warning: client does not report 'base64' protocol support"
end
response += "\r\n" + trailer
#puts "Response: [#{response.inspect}]"
msg "%s WebSocket connection" % [stype]
msg "Version %s, base64: '%s'" % [t[:version], t[:base64]]
if t[:path] then msg "Path: '%s'" % [t[:path]] end
#puts "sending reponse #{response.inspect}"
retsock.send(response, 0)
# Return the WebSocket socket which may be SSL wrapped
return retsock
end
def serve(io)
@@client_id += 1
@my_client_id = @@client_id
@send_parts = []
@recv_part = nil
@base64 = nil
begin
@client = do_handshake(io)
new_client
rescue EClose => e
msg "Client closed: #{e.message}"
return
rescue Exception => e
msg "Uncaught exception: #{e.message}"
msg "Trace: #{e.backtrace}"
return
end
msg "Client disconnected"
end
end
# vim: sw=2

View File

@ -10,26 +10,44 @@ $: << "../other"
require 'websocket'
require 'optparse'
# Proxy traffic to and from a WebSockets client to a normal TCP
# socket server target. All traffic to/from the client is base64
# encoded/decoded to allow binary data to be sent/received to/from
# the target.
class WebSocketProxy < WebSocketServer
def initialize(port, host, opts, *args)
@@Traffic_legend = "
Traffic Legend:
} - Client receive
}. - Client receive partial
{ - Target receive
> - Target send
>. - Target send partial
< - Client send
<. - Client send partial
"
def initialize(opts)
vmsg "in WebSocketProxy.initialize"
super(port, host, opts, *args)
super(opts)
@target_host = opts["target_host"]
@target_port = opts["target_port"]
end
# Echo back whatever is received
def new_client()
vmsg "in new_client"
def new_client(client)
msg "connecting to: %s:%s" % [@target_host, @target_port]
tsock = TCPSocket.open(@target_host, @target_port)
msg "opened target socket"
if @verbose then puts @@Traffic_legend end
begin
do_proxy(tsock)
do_proxy(client, tsock)
rescue
tsock.shutdown(Socket::SHUT_RDWR)
tsock.close
@ -37,13 +55,12 @@ class WebSocketProxy < WebSocketServer
end
end
def do_proxy(target)
# Proxy client WebSocket to normal target socket.
def do_proxy(client, target)
cqueue = []
c_pend = 0
tqueue = []
rlist = [@client, target]
rlist = [client, target]
loop do
wlist = []
@ -52,7 +69,7 @@ class WebSocketProxy < WebSocketServer
wlist << target
end
if cqueue.length > 0 || c_pend > 0
wlist << @client
wlist << client
end
ins, outs, excepts = IO.select(rlist, wlist, nil, 0.001)
@ -60,8 +77,8 @@ class WebSocketProxy < WebSocketServer
raise Exception, "Socket exception"
end
# Send queued client data to the target
if outs && outs.include?(target)
# Send queued client data to the target
dat = tqueue.shift
sent = target.send(dat, 0)
if sent == dat.length
@ -72,9 +89,9 @@ class WebSocketProxy < WebSocketServer
end
end
# Receive target data and queue for the client
if ins && ins.include?(target)
# Receive target data and queue for the client
buf = target.recv(@@buffer_size)
buf = target.recv(@@Buffer_size)
if buf.length == 0:
raise EClose, "Target closed"
end
@ -83,17 +100,16 @@ class WebSocketProxy < WebSocketServer
traffic "{"
end
if outs && outs.include?(@client)
# Encode and send queued data to the client
# Encode and send queued data to the client
if outs && outs.include?(client)
c_pend = send_frames(cqueue)
cqueue = []
end
if ins && ins.include?(@client)
# Receive client data, decode it, and send it back
# Receive client data, decode it, and send it back
if ins && ins.include?(client)
frames, closed = recv_frames
tqueue += frames
#msg "[#{cqueue.inspect}]"
if closed
send_close
@ -111,8 +127,6 @@ parser = OptionParser.new do |o|
o.on('--verbose', '-v') { |b| opts['verbose'] = b }
o.parse!
end
puts "opts: #{opts.inspect}"
puts "ARGV: #{ARGV.inspect}"
if ARGV.length < 2:
puts "Too few arguments"
@ -123,7 +137,7 @@ end
if ARGV[0].count(":") > 0
opts['listen_host'], _, opts['listen_port'] = ARGV[0].rpartition(':')
else
opts['listen_host'], opts['listen_port'] = GServer::DEFAULT_HOST, ARGV[0]
opts['listen_host'], opts['listen_port'] = nil, ARGV[0]
end
begin
@ -148,13 +162,9 @@ rescue
end
puts "Starting server on #{opts['listen_host']}:#{opts['listen_port']}"
server = WebSocketProxy.new(opts['listen_port'], opts['listen_host'], opts)
#server = WebSocketProxy.new(opts['listen_port'])
server.start
loop do
break if server.stopped?
end
server = WebSocketProxy.new(opts)
server.start(100)
server.join
puts "Server has been terminated"

View File

@ -12,17 +12,17 @@ require 'websocket'
class WebSocketEcho < WebSocketServer
# Echo back whatever is received
def new_client()
def new_client(client)
cqueue = []
c_pend = 0
rlist = [@client]
rlist = [client]
loop do
wlist = []
if cqueue.length > 0 or c_pend
wlist << @client
wlist << client
end
ins, outs, excepts = IO.select(rlist, wlist, nil, 1)
@ -30,17 +30,16 @@ class WebSocketEcho < WebSocketServer
raise Exception, "Socket exception"
end
if outs.include?(@client)
if outs.include?(client)
# Send queued data to the client
c_pend = send_frames(cqueue)
cqueue = []
end
if ins.include?(@client)
if ins.include?(client)
# Receive client data, decode it, and send it back
frames, closed = recv_frames
cqueue += frames
#puts "#{@my_client_id}: >#{cqueue.inspect}<"
if closed
raise EClose, closed
@ -51,15 +50,12 @@ class WebSocketEcho < WebSocketServer
end
end
port = ARGV[0].to_i
puts "Starting server on port #{port}"
puts "Starting server on port 1234"
server = WebSocketEcho.new(1234)
server = WebSocketEcho.new('listen_port' => port, 'verbose' => true)
server.start
loop do
break if server.stopped?
end
server.join
puts "Server has been terminated"