2016-09-15 18:51:26 +01:00
|
|
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
|
|
|
|
# Copyright(c)2013 NTT corp. All Rights Reserved.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
# not use this file except in compliance with the License. You may obtain
|
|
|
|
# a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
# License for the specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
|
2016-09-16 17:50:55 +01:00
|
|
|
""" Unit tests for websockifyserver """
|
2016-09-15 18:51:26 +01:00
|
|
|
import errno
|
|
|
|
import os
|
|
|
|
import logging
|
|
|
|
import select
|
|
|
|
import shutil
|
|
|
|
import socket
|
|
|
|
import ssl
|
2020-08-21 09:50:11 +01:00
|
|
|
try:
|
|
|
|
from mock import patch, MagicMock, ANY
|
|
|
|
except ImportError:
|
|
|
|
from unittest.mock import patch, MagicMock, ANY
|
2016-09-15 18:51:26 +01:00
|
|
|
import sys
|
|
|
|
import tempfile
|
|
|
|
import unittest
|
|
|
|
import socket
|
|
|
|
import signal
|
2016-09-16 17:50:55 +01:00
|
|
|
from websockify import websockifyserver
|
2016-09-15 18:51:26 +01:00
|
|
|
|
|
|
|
try:
|
2016-09-16 17:50:55 +01:00
|
|
|
from BaseHTTPServer import BaseHTTPRequestHandler
|
2016-09-15 18:51:26 +01:00
|
|
|
except ImportError:
|
2016-09-16 17:50:55 +01:00
|
|
|
from http.server import BaseHTTPRequestHandler
|
2016-09-15 18:51:26 +01:00
|
|
|
|
|
|
|
try:
|
|
|
|
from StringIO import StringIO
|
|
|
|
BytesIO = StringIO
|
|
|
|
except ImportError:
|
|
|
|
from io import StringIO
|
|
|
|
from io import BytesIO
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def raise_oserror(*args, **kwargs):
|
|
|
|
raise OSError('fake error')
|
|
|
|
|
|
|
|
|
|
|
|
class FakeSocket(object):
|
|
|
|
def __init__(self, data=''):
|
|
|
|
if isinstance(data, bytes):
|
|
|
|
self._data = data
|
|
|
|
else:
|
|
|
|
self._data = data.encode('latin_1')
|
|
|
|
|
|
|
|
def recv(self, amt, flags=None):
|
|
|
|
res = self._data[0:amt]
|
|
|
|
if not (flags & socket.MSG_PEEK):
|
|
|
|
self._data = self._data[amt:]
|
|
|
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
def makefile(self, mode='r', buffsize=None):
|
|
|
|
if 'b' in mode:
|
|
|
|
return BytesIO(self._data)
|
|
|
|
else:
|
|
|
|
return StringIO(self._data.decode('latin_1'))
|
|
|
|
|
|
|
|
|
2016-09-16 17:50:55 +01:00
|
|
|
class WebSockifyRequestHandlerTestCase(unittest.TestCase):
|
2016-09-15 18:51:26 +01:00
|
|
|
def setUp(self):
|
2016-09-16 17:50:55 +01:00
|
|
|
super(WebSockifyRequestHandlerTestCase, self).setUp()
|
2016-09-15 18:51:26 +01:00
|
|
|
self.tmpdir = tempfile.mkdtemp('-websockify-tests')
|
|
|
|
# Mock this out cause it screws tests up
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('os.chdir').start()
|
test_websockifyserver: add send_error stub
The socket.sendall method is called indirectly via calls
to the python3.6 BaseHTTPRequestHandler.send_error method
which is called by both the Web*RequestHandler classes as
shown below:
======================================================================
ERROR: test_list_dir_with_file_only_returns_error (test_websockifyserver.WebSockifyRequestHandlerTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "tests/test_websockifyserver.py", line 115, in test_list_dir_with_file_only_returns_error
FakeSocket('GET / HTTP/1.1'), '127.0.0.1', server)
File "websockify/websockifyserver.py", line 94, in __init__
WebSocketRequestHandler.__init__(self, req, addr, server)
File "websockify/websocketserver.py", line 34, in __init__
BaseHTTPRequestHandler.__init__(self, request, client_address, server)
File "/usr/lib64/python3.6/socketserver.py", line 696, in __init__
self.handle()
File "websockify/websockifyserver.py", line 293, in handle
SimpleHTTPRequestHandler.handle(self)
File "/usr/lib64/python3.6/http/server.py", line 418, in handle
self.handle_one_request()
File "websockify/websocketserver.py", line 46, in handle_one_request
BaseHTTPRequestHandler.handle_one_request(self)
File "/usr/lib64/python3.6/http/server.py", line 406, in handle_one_request
method()
File "websockify/websocketserver.py", line 58, in _websocket_do_GET
self.do_GET()
File "websockify/websockifyserver.py", line 259, in do_GET
SimpleHTTPRequestHandler.do_GET(self)
File "/usr/lib64/python3.6/http/server.py", line 636, in do_GET
f = self.send_head()
File "/usr/lib64/python3.6/http/server.py", line 679, in send_head
return self.list_directory(path)
File "websockify/websockifyserver.py", line 263, in list_directory
self.send_error(404, "No such file")
File "/usr/lib64/python3.6/http/server.py", line 470, in send_error
self.end_headers()
File "/usr/lib64/python3.6/http/server.py", line 520, in end_headers
self.flush_headers()
File "/usr/lib64/python3.6/http/server.py", line 524, in flush_headers
self.wfile.write(b"".join(self._headers_buffer))
File "/usr/lib64/python3.6/socketserver.py", line 775, in write
self._sock.sendall(b)
AttributeError: 'FakeSocket' object has no attribute 'sendall'
======================================================================
ERROR: test_normal_get_with_only_upgrade_returns_error (test_websockifyserver.WebSockifyRequestHandlerTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "tests/test_websockifyserver.py", line 101, in test_normal_get_with_only_upgrade_returns_error
FakeSocket('GET /tmp.txt HTTP/1.1'), '127.0.0.1', server)
File "websockify/websockifyserver.py", line 94, in __init__
WebSocketRequestHandler.__init__(self, req, addr, server)
File "websockify/websocketserver.py", line 34, in __init__
BaseHTTPRequestHandler.__init__(self, request, client_address, server)
File "/usr/lib64/python3.6/socketserver.py", line 696, in __init__
self.handle()
File "websockify/websockifyserver.py", line 293, in handle
SimpleHTTPRequestHandler.handle(self)
File "/usr/lib64/python3.6/http/server.py", line 418, in handle
self.handle_one_request()
File "websockify/websocketserver.py", line 46, in handle_one_request
BaseHTTPRequestHandler.handle_one_request(self)
File "/usr/lib64/python3.6/http/server.py", line 406, in handle_one_request
method()
File "websockify/websocketserver.py", line 58, in _websocket_do_GET
self.do_GET()
File "websockify/websockifyserver.py", line 257, in do_GET
self.send_error(405, "Method Not Allowed")
File "/usr/lib64/python3.6/http/server.py", line 470, in send_error
self.end_headers()
File "/usr/lib64/python3.6/http/server.py", line 520, in end_headers
self.flush_headers()
File "/usr/lib64/python3.6/http/server.py", line 524, in flush_headers
self.wfile.write(b"".join(self._headers_buffer))
File "/usr/lib64/python3.6/socketserver.py", line 775, in write
self._sock.sendall(b)
AttributeError: 'FakeSocket' object has no attribute 'sendall'
2017-05-12 03:04:25 +01:00
|
|
|
|
2016-09-15 18:51:26 +01:00
|
|
|
def tearDown(self):
|
|
|
|
"""Called automatically after each test."""
|
2020-08-21 09:50:11 +01:00
|
|
|
patch.stopall()
|
2016-09-15 18:51:26 +01:00
|
|
|
os.rmdir(self.tmpdir)
|
2016-09-16 17:50:55 +01:00
|
|
|
super(WebSockifyRequestHandlerTestCase, self).tearDown()
|
2016-09-15 18:51:26 +01:00
|
|
|
|
2016-09-16 17:50:55 +01:00
|
|
|
def _get_server(self, handler_class=websockifyserver.WebSockifyRequestHandler,
|
2016-09-15 18:51:26 +01:00
|
|
|
**kwargs):
|
|
|
|
web = kwargs.pop('web', self.tmpdir)
|
2016-09-16 17:50:55 +01:00
|
|
|
return websockifyserver.WebSockifyServer(
|
2016-09-15 18:51:26 +01:00
|
|
|
handler_class, listen_host='localhost',
|
|
|
|
listen_port=80, key=self.tmpdir, web=web,
|
|
|
|
record=self.tmpdir, daemon=False, ssl_only=0, idle_timeout=1,
|
|
|
|
**kwargs)
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
@patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
|
|
|
|
def test_normal_get_with_only_upgrade_returns_error(self, send_error):
|
2016-09-15 18:51:26 +01:00
|
|
|
server = self._get_server(web=None)
|
2016-09-16 17:50:55 +01:00
|
|
|
handler = websockifyserver.WebSockifyRequestHandler(
|
2016-09-15 18:51:26 +01:00
|
|
|
FakeSocket('GET /tmp.txt HTTP/1.1'), '127.0.0.1', server)
|
|
|
|
|
|
|
|
handler.do_GET()
|
2020-08-21 09:50:11 +01:00
|
|
|
send_error.assert_called_with(405, ANY)
|
2016-09-15 18:51:26 +01:00
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
@patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
|
|
|
|
def test_list_dir_with_file_only_returns_error(self, send_error):
|
2016-09-15 18:51:26 +01:00
|
|
|
server = self._get_server(file_only=True)
|
2016-09-16 17:50:55 +01:00
|
|
|
handler = websockifyserver.WebSockifyRequestHandler(
|
2016-09-15 18:51:26 +01:00
|
|
|
FakeSocket('GET / HTTP/1.1'), '127.0.0.1', server)
|
|
|
|
|
|
|
|
handler.path = '/'
|
|
|
|
handler.do_GET()
|
2020-08-21 09:50:11 +01:00
|
|
|
send_error.assert_called_with(404, ANY)
|
2016-09-15 18:51:26 +01:00
|
|
|
|
|
|
|
|
2016-09-16 17:50:55 +01:00
|
|
|
class WebSockifyServerTestCase(unittest.TestCase):
|
2016-09-15 18:51:26 +01:00
|
|
|
def setUp(self):
|
2016-09-16 17:50:55 +01:00
|
|
|
super(WebSockifyServerTestCase, self).setUp()
|
2016-09-15 18:51:26 +01:00
|
|
|
self.tmpdir = tempfile.mkdtemp('-websockify-tests')
|
|
|
|
# Mock this out cause it screws tests up
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('os.chdir').start()
|
2016-09-15 18:51:26 +01:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
"""Called automatically after each test."""
|
2020-08-21 09:50:11 +01:00
|
|
|
patch.stopall()
|
2016-09-15 18:51:26 +01:00
|
|
|
os.rmdir(self.tmpdir)
|
2016-09-16 17:50:55 +01:00
|
|
|
super(WebSockifyServerTestCase, self).tearDown()
|
2016-09-15 18:51:26 +01:00
|
|
|
|
2016-09-16 17:50:55 +01:00
|
|
|
def _get_server(self, handler_class=websockifyserver.WebSockifyRequestHandler,
|
2016-09-15 18:51:26 +01:00
|
|
|
**kwargs):
|
2016-09-16 17:50:55 +01:00
|
|
|
return websockifyserver.WebSockifyServer(
|
2016-09-15 18:51:26 +01:00
|
|
|
handler_class, listen_host='localhost',
|
|
|
|
listen_port=80, key=self.tmpdir, web=self.tmpdir,
|
|
|
|
record=self.tmpdir, **kwargs)
|
|
|
|
|
|
|
|
def test_daemonize_raises_error_while_closing_fds(self):
|
|
|
|
server = self._get_server(daemon=True, ssl_only=1, idle_timeout=1)
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('os.fork').start().return_value = 0
|
|
|
|
patch('signal.signal').start()
|
|
|
|
patch('os.setsid').start()
|
|
|
|
patch('os.close').start().side_effect = raise_oserror
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertRaises(OSError, server.daemonize, keepfd=None, chdir='./')
|
|
|
|
|
|
|
|
def test_daemonize_ignores_ebadf_error_while_closing_fds(self):
|
|
|
|
def raise_oserror_ebadf(fd):
|
|
|
|
raise OSError(errno.EBADF, 'fake error')
|
|
|
|
|
|
|
|
server = self._get_server(daemon=True, ssl_only=1, idle_timeout=1)
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('os.fork').start().return_value = 0
|
|
|
|
patch('signal.signal').start()
|
|
|
|
patch('os.setsid').start()
|
|
|
|
patch('os.close').start().side_effect = raise_oserror_ebadf
|
|
|
|
patch('os.open').start().side_effect = raise_oserror
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertRaises(OSError, server.daemonize, keepfd=None, chdir='./')
|
|
|
|
|
|
|
|
def test_handshake_fails_on_not_ready(self):
|
|
|
|
server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1)
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([], [], [])
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertRaises(
|
2016-09-16 17:50:55 +01:00
|
|
|
websockifyserver.WebSockifyServer.EClose, server.do_handshake,
|
2016-09-15 18:51:26 +01:00
|
|
|
FakeSocket(), '127.0.0.1')
|
|
|
|
|
|
|
|
def test_empty_handshake_fails(self):
|
|
|
|
server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1)
|
|
|
|
|
|
|
|
sock = FakeSocket('')
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([sock], [], [])
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertRaises(
|
2016-09-16 17:50:55 +01:00
|
|
|
websockifyserver.WebSockifyServer.EClose, server.do_handshake,
|
2016-09-15 18:51:26 +01:00
|
|
|
sock, '127.0.0.1')
|
|
|
|
|
|
|
|
def test_handshake_policy_request(self):
|
|
|
|
# TODO(directxman12): implement
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_handshake_ssl_only_without_ssl_raises_error(self):
|
|
|
|
server = self._get_server(daemon=True, ssl_only=1, idle_timeout=1)
|
|
|
|
|
|
|
|
sock = FakeSocket('some initial data')
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([sock], [], [])
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertRaises(
|
2016-09-16 17:50:55 +01:00
|
|
|
websockifyserver.WebSockifyServer.EClose, server.do_handshake,
|
2016-09-15 18:51:26 +01:00
|
|
|
sock, '127.0.0.1')
|
|
|
|
|
|
|
|
def test_do_handshake_no_ssl(self):
|
|
|
|
class FakeHandler(object):
|
|
|
|
CALLED = False
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
type(self).CALLED = True
|
|
|
|
|
|
|
|
FakeHandler.CALLED = False
|
|
|
|
|
|
|
|
server = self._get_server(
|
|
|
|
handler_class=FakeHandler, daemon=True,
|
|
|
|
ssl_only=0, idle_timeout=1)
|
|
|
|
|
|
|
|
sock = FakeSocket('some initial data')
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([sock], [], [])
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertEqual(server.do_handshake(sock, '127.0.0.1'), sock)
|
|
|
|
self.assertTrue(FakeHandler.CALLED, True)
|
|
|
|
|
|
|
|
def test_do_handshake_ssl(self):
|
|
|
|
# TODO(directxman12): implement this
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_do_handshake_ssl_without_ssl_raises_error(self):
|
|
|
|
# TODO(directxman12): implement this
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_do_handshake_ssl_without_cert_raises_error(self):
|
|
|
|
server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1,
|
|
|
|
cert='afdsfasdafdsafdsafdsafdas')
|
|
|
|
|
|
|
|
sock = FakeSocket("\x16some ssl data")
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([sock], [], [])
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertRaises(
|
2016-09-16 17:50:55 +01:00
|
|
|
websockifyserver.WebSockifyServer.EClose, server.do_handshake,
|
2016-09-15 18:51:26 +01:00
|
|
|
sock, '127.0.0.1')
|
|
|
|
|
|
|
|
def test_do_handshake_ssl_error_eof_raises_close_error(self):
|
|
|
|
server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1)
|
|
|
|
|
|
|
|
sock = FakeSocket("\x16some ssl data")
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([sock], [], [])
|
|
|
|
|
|
|
|
def fake_wrap_socket(*args, **kwargs):
|
|
|
|
raise ssl.SSLError(ssl.SSL_ERROR_EOF)
|
|
|
|
|
2017-08-29 20:24:32 +01:00
|
|
|
class fake_create_default_context():
|
|
|
|
def __init__(self, purpose):
|
|
|
|
self.verify_mode = None
|
2018-07-02 17:35:25 +01:00
|
|
|
self.options = 0
|
2019-03-02 16:21:28 +00:00
|
|
|
def load_cert_chain(self, certfile, keyfile, password):
|
2017-08-29 20:24:32 +01:00
|
|
|
pass
|
|
|
|
def set_default_verify_paths(self):
|
|
|
|
pass
|
|
|
|
def load_verify_locations(self, cafile):
|
|
|
|
pass
|
|
|
|
def wrap_socket(self, *args, **kwargs):
|
|
|
|
raise ssl.SSLError(ssl.SSL_ERROR_EOF)
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2017-08-29 20:24:32 +01:00
|
|
|
if (hasattr(ssl, 'create_default_context')):
|
|
|
|
# for recent versions of python
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('ssl.create_default_context').start().side_effect = fake_create_default_context
|
2017-08-29 20:24:32 +01:00
|
|
|
else:
|
|
|
|
# for fallback for old versions of python
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('ssl.warp_socket').start().side_effect = fake_wrap_socket
|
2016-09-15 18:51:26 +01:00
|
|
|
self.assertRaises(
|
2016-09-16 17:50:55 +01:00
|
|
|
websockifyserver.WebSockifyServer.EClose, server.do_handshake,
|
2016-09-15 18:51:26 +01:00
|
|
|
sock, '127.0.0.1')
|
|
|
|
|
2018-07-02 17:35:25 +01:00
|
|
|
def test_do_handshake_ssl_sets_ciphers(self):
|
|
|
|
test_ciphers = 'TEST-CIPHERS-1:TEST-CIPHER-2'
|
|
|
|
|
|
|
|
class FakeHandler(object):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
pass
|
|
|
|
|
|
|
|
server = self._get_server(handler_class=FakeHandler, daemon=True,
|
|
|
|
idle_timeout=1, ssl_ciphers=test_ciphers)
|
|
|
|
sock = FakeSocket("\x16some ssl data")
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([sock], [], [])
|
|
|
|
|
|
|
|
class fake_create_default_context():
|
|
|
|
CIPHERS = ''
|
|
|
|
def __init__(self, purpose):
|
|
|
|
self.verify_mode = None
|
|
|
|
self.options = 0
|
2019-03-02 16:21:28 +00:00
|
|
|
def load_cert_chain(self, certfile, keyfile, password):
|
2018-07-02 17:35:25 +01:00
|
|
|
pass
|
|
|
|
def set_default_verify_paths(self):
|
|
|
|
pass
|
|
|
|
def load_verify_locations(self, cafile):
|
|
|
|
pass
|
|
|
|
def wrap_socket(self, *args, **kwargs):
|
|
|
|
pass
|
|
|
|
def set_ciphers(self, ciphers_to_set):
|
|
|
|
fake_create_default_context.CIPHERS = ciphers_to_set
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2018-07-02 17:35:25 +01:00
|
|
|
if (hasattr(ssl, 'create_default_context')):
|
|
|
|
# for recent versions of python
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('ssl.create_default_context').start().side_effect = fake_create_default_context
|
2018-07-02 17:35:25 +01:00
|
|
|
server.do_handshake(sock, '127.0.0.1')
|
|
|
|
self.assertEqual(fake_create_default_context.CIPHERS, test_ciphers)
|
|
|
|
else:
|
|
|
|
# for fallback for old versions of python
|
|
|
|
# not supperted, nothing to test
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_do_handshake_ssl_sets_opions(self):
|
|
|
|
test_options = 0xCAFEBEEF
|
|
|
|
|
|
|
|
class FakeHandler(object):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
pass
|
|
|
|
|
|
|
|
server = self._get_server(handler_class=FakeHandler, daemon=True,
|
|
|
|
idle_timeout=1, ssl_options=test_options)
|
|
|
|
sock = FakeSocket("\x16some ssl data")
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
return ([sock], [], [])
|
|
|
|
|
|
|
|
class fake_create_default_context(object):
|
|
|
|
OPTIONS = 0
|
|
|
|
def __init__(self, purpose):
|
|
|
|
self.verify_mode = None
|
|
|
|
self._options = 0
|
2019-03-02 16:21:28 +00:00
|
|
|
def load_cert_chain(self, certfile, keyfile, password):
|
2018-07-02 17:35:25 +01:00
|
|
|
pass
|
|
|
|
def set_default_verify_paths(self):
|
|
|
|
pass
|
|
|
|
def load_verify_locations(self, cafile):
|
|
|
|
pass
|
|
|
|
def wrap_socket(self, *args, **kwargs):
|
|
|
|
pass
|
|
|
|
def get_options(self):
|
|
|
|
return self._options
|
|
|
|
def set_options(self, val):
|
|
|
|
fake_create_default_context.OPTIONS = val
|
|
|
|
options = property(get_options, set_options)
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('select.select').start().side_effect = fake_select
|
2018-07-02 17:35:25 +01:00
|
|
|
if (hasattr(ssl, 'create_default_context')):
|
|
|
|
# for recent versions of python
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('ssl.create_default_context').start().side_effect = fake_create_default_context
|
2018-07-02 17:35:25 +01:00
|
|
|
server.do_handshake(sock, '127.0.0.1')
|
|
|
|
self.assertEqual(fake_create_default_context.OPTIONS, test_options)
|
|
|
|
else:
|
|
|
|
# for fallback for old versions of python
|
|
|
|
# not supperted, nothing to test
|
|
|
|
pass
|
|
|
|
|
2016-09-15 18:51:26 +01:00
|
|
|
def test_fallback_sigchld_handler(self):
|
|
|
|
# TODO(directxman12): implement this
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_start_server_error(self):
|
|
|
|
server = self._get_server(daemon=False, ssl_only=1, idle_timeout=1)
|
|
|
|
sock = server.socket('localhost')
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
raise Exception("fake error")
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('websockify.websockifyserver.WebSockifyServer.socket').start()
|
|
|
|
patch('websockify.websockifyserver.WebSockifyServer.daemonize').start()
|
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
server.start_server()
|
|
|
|
|
|
|
|
def test_start_server_keyboardinterrupt(self):
|
|
|
|
server = self._get_server(daemon=False, ssl_only=0, idle_timeout=1)
|
|
|
|
sock = server.socket('localhost')
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
raise KeyboardInterrupt
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('websockify.websockifyserver.WebSockifyServer.socket').start()
|
|
|
|
patch('websockify.websockifyserver.WebSockifyServer.daemonize').start()
|
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
server.start_server()
|
|
|
|
|
|
|
|
def test_start_server_systemexit(self):
|
|
|
|
server = self._get_server(daemon=False, ssl_only=0, idle_timeout=1)
|
|
|
|
sock = server.socket('localhost')
|
|
|
|
|
|
|
|
def fake_select(rlist, wlist, xlist, timeout=None):
|
|
|
|
sys.exit()
|
|
|
|
|
2020-08-21 09:50:11 +01:00
|
|
|
patch('websockify.websockifyserver.WebSockifyServer.socket').start()
|
|
|
|
patch('websockify.websockifyserver.WebSockifyServer.daemonize').start()
|
|
|
|
patch('select.select').start().side_effect = fake_select
|
2016-09-15 18:51:26 +01:00
|
|
|
server.start_server()
|
|
|
|
|
|
|
|
def test_socket_set_keepalive_options(self):
|
|
|
|
keepcnt = 12
|
|
|
|
keepidle = 34
|
|
|
|
keepintvl = 56
|
|
|
|
|
|
|
|
server = self._get_server(daemon=False, ssl_only=0, idle_timeout=1)
|
|
|
|
sock = server.socket('localhost',
|
|
|
|
tcp_keepcnt=keepcnt,
|
|
|
|
tcp_keepidle=keepidle,
|
|
|
|
tcp_keepintvl=keepintvl)
|
|
|
|
|
|
|
|
if hasattr(socket, 'TCP_KEEPCNT'):
|
|
|
|
self.assertEqual(sock.getsockopt(socket.SOL_TCP,
|
|
|
|
socket.TCP_KEEPCNT), keepcnt)
|
|
|
|
self.assertEqual(sock.getsockopt(socket.SOL_TCP,
|
|
|
|
socket.TCP_KEEPIDLE), keepidle)
|
|
|
|
self.assertEqual(sock.getsockopt(socket.SOL_TCP,
|
|
|
|
socket.TCP_KEEPINTVL), keepintvl)
|
|
|
|
|
|
|
|
sock = server.socket('localhost',
|
|
|
|
tcp_keepalive=False,
|
|
|
|
tcp_keepcnt=keepcnt,
|
|
|
|
tcp_keepidle=keepidle,
|
|
|
|
tcp_keepintvl=keepintvl)
|
|
|
|
|
|
|
|
if hasattr(socket, 'TCP_KEEPCNT'):
|
|
|
|
self.assertNotEqual(sock.getsockopt(socket.SOL_TCP,
|
|
|
|
socket.TCP_KEEPCNT), keepcnt)
|
|
|
|
self.assertNotEqual(sock.getsockopt(socket.SOL_TCP,
|
|
|
|
socket.TCP_KEEPIDLE), keepidle)
|
|
|
|
self.assertNotEqual(sock.getsockopt(socket.SOL_TCP,
|
|
|
|
socket.TCP_KEEPINTVL), keepintvl)
|