diff --git a/tests/test_token_plugins.py b/tests/test_token_plugins.py new file mode 100644 index 0000000..5d63a2b --- /dev/null +++ b/tests/test_token_plugins.py @@ -0,0 +1,135 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +""" Unit tests for Token plugins""" + +import unittest +from unittest.mock import patch +from jwcrypto import jwt + +from websockify.token_plugins import JWTTokenApi + +class JWSTokenTestCase(unittest.TestCase): + def test_asymmetric_jws_token_plugin(self): + plugin = JWTTokenApi("./tests/fixtures/public.pem") + + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"}) + jwt_token.make_signed_token(key) + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNotNone(result) + self.assertEqual(result[0], "remote_host") + self.assertEqual(result[1], "remote_port") + + def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self): + plugin = JWTTokenApi("wrong.pub") + + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"}) + jwt_token.make_signed_token(key) + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNone(result) + + @patch('time.time') + def test_jwt_valid_time(self, mock_time): + plugin = JWTTokenApi("./tests/fixtures/public.pem") + + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) + jwt_token.make_signed_token(key) + mock_time.return_value = 150 + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNotNone(result) + self.assertEqual(result[0], "remote_host") + self.assertEqual(result[1], "remote_port") + + @patch('time.time') + def test_jwt_early_time(self, mock_time): + plugin = JWTTokenApi("./tests/fixtures/public.pem") + + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) + jwt_token.make_signed_token(key) + mock_time.return_value = 50 + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNone(result) + + @patch('time.time') + def test_jwt_late_time(self, mock_time): + plugin = JWTTokenApi("./tests/fixtures/public.pem") + + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) + jwt_token.make_signed_token(key) + mock_time.return_value = 250 + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNone(result) + + def test_symmetric_jws_token_plugin(self): + plugin = JWTTokenApi("./tests/fixtures/symmetric.key") + + secret = open("./tests/fixtures/symmetric.key").read() + key = jwt.JWK() + key.import_key(kty="oct",k=secret) + jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"}) + jwt_token.make_signed_token(key) + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNotNone(result) + self.assertEqual(result[0], "remote_host") + self.assertEqual(result[1], "remote_port") + + def test_symmetric_jws_token_plugin_with_illigal_key_exception(self): + plugin = JWTTokenApi("wrong_sauce") + + secret = open("./tests/fixtures/symmetric.key").read() + key = jwt.JWK() + key.import_key(kty="oct",k=secret) + jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"}) + jwt_token.make_signed_token(key) + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNone(result) + + def test_asymmetric_jwe_token_plugin(self): + plugin = JWTTokenApi("./tests/fixtures/private.pem") + + private_key = jwt.JWK() + public_key = jwt.JWK() + private_key_data = open("./tests/fixtures/private.pem", "rb").read() + public_key_data = open("./tests/fixtures/public.pem", "rb").read() + private_key.import_from_pem(private_key_data) + public_key.import_from_pem(public_key_data) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"}) + jwt_token.make_signed_token(private_key) + jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"}, + claims=jwt_token.serialize()) + jwe_token.make_encrypted_token(public_key) + + result = plugin.lookup(jwt_token.serialize()) + + self.assertIsNotNone(result) + self.assertEqual(result[0], "remote_host") + self.assertEqual(result[1], "remote_port") + diff --git a/tests/test_websocketproxy.py b/tests/test_websocketproxy.py index ffdecb5..a05e3b1 100644 --- a/tests/test_websocketproxy.py +++ b/tests/test_websocketproxy.py @@ -24,8 +24,6 @@ from io import StringIO from io import BytesIO from unittest.mock import patch, MagicMock -from jwcrypto import jwt - from websockify import websocketproxy from websockify import token_plugins from websockify import auth_plugins @@ -114,133 +112,6 @@ class ProxyRequestHandlerTestCase(unittest.TestCase): self.assertEqual(self.handler.server.target_host, "somehost") self.assertEqual(self.handler.server.target_port, "blah") - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - def test_asymmetric_jws_token_plugin(self): - key = jwt.JWK() - private_key = open("./tests/fixtures/private.pem", "rb").read() - key.import_from_pem(private_key) - jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"}) - jwt_token.make_signed_token(key) - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem") - self.handler.validate_connection() - - self.assertEqual(self.handler.server.target_host, "remote_host") - self.assertEqual(self.handler.server.target_port, "remote_port") - - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self): - key = jwt.JWK() - private_key = open("./tests/fixtures/private.pem", "rb").read() - key.import_from_pem(private_key) - jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"}) - jwt_token.make_signed_token(key) - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong.pub") - with self.assertRaises(self.handler.server.EClose): - self.handler.validate_connection() - - - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - @patch('time.time') - def test_jwt_valid_time(self, mock_time): - key = jwt.JWK() - private_key = open("./tests/fixtures/private.pem", "rb").read() - key.import_from_pem(private_key) - jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) - jwt_token.make_signed_token(key) - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) - mock_time.return_value = 150 - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem") - self.handler.validate_connection() - - self.assertEqual(self.handler.server.target_host, "remote_host") - self.assertEqual(self.handler.server.target_port, "remote_port") - - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - @patch('time.time') - def test_jwt_early_time(self, mock_time): - key = jwt.JWK() - private_key = open("./tests/fixtures/private.pem", "rb").read() - key.import_from_pem(private_key) - jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) - jwt_token.make_signed_token(key) - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) - mock_time.return_value = 50 - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem") - with self.assertRaises(self.handler.server.EClose): - self.handler.validate_connection() - - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - @patch('time.time') - def test_jwt_late_time(self, mock_time): - key = jwt.JWK() - private_key = open("./tests/fixtures/private.pem", "rb").read() - key.import_from_pem(private_key) - jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) - jwt_token.make_signed_token(key) - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) - mock_time.return_value = 250 - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem") - with self.assertRaises(self.handler.server.EClose): - self.handler.validate_connection() - - - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - def test_symmetric_jws_token_plugin(self): - secret = open("./tests/fixtures/symmetric.key").read() - key = jwt.JWK() - key.import_key(kty="oct",k=secret) - jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"}) - jwt_token.make_signed_token(key) - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/symmetric.key") - self.handler.validate_connection() - - self.assertEqual(self.handler.server.target_host, "remote_host") - self.assertEqual(self.handler.server.target_port, "remote_port") - - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - def test_symmetric_jws_token_plugin_with_illigal_key_exception(self): - secret = open("./tests/fixtures/symmetric.key").read() - key = jwt.JWK() - key.import_key(kty="oct",k=secret) - jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"}) - jwt_token.make_signed_token(key) - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong_sauce") - self.assertRaises(self.handler.server.EClose, - self.handler.validate_connection) - - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) - def test_asymmetric_jwe_token_plugin(self): - private_key = jwt.JWK() - public_key = jwt.JWK() - private_key_data = open("./tests/fixtures/private.pem", "rb").read() - public_key_data = open("./tests/fixtures/public.pem", "rb").read() - private_key.import_from_pem(private_key_data) - public_key.import_from_pem(public_key_data) - jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"}) - jwt_token.make_signed_token(private_key) - jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"}, - claims=jwt_token.serialize()) - jwe_token.make_encrypted_token(public_key) - - self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwe_token.serialize()) - - self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/private.pem") - self.handler.validate_connection() - - self.assertEqual(self.handler.server.target_host, "remote_host") - self.assertEqual(self.handler.server.target_port, "remote_port") - @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) def test_auth_plugin(self): class TestPlugin(auth_plugins.BasePlugin):