From b22a6b0ce04a51bb21ba7b13252b233cd31a2671 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Fri, 29 Jan 2021 13:11:07 +0100 Subject: [PATCH] Add unit tests for jwt token time checks --- tests/test_websocketproxy.py | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/test_websocketproxy.py b/tests/test_websocketproxy.py index 43511fd..ffdecb5 100644 --- a/tests/test_websocketproxy.py +++ b/tests/test_websocketproxy.py @@ -143,6 +143,54 @@ class ProxyRequestHandlerTestCase(unittest.TestCase): self.handler.validate_connection() + @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) + @patch('time.time') + def test_jwt_valid_time(self, mock_time): + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) + jwt_token.make_signed_token(key) + self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) + mock_time.return_value = 150 + + self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem") + self.handler.validate_connection() + + self.assertEqual(self.handler.server.target_host, "remote_host") + self.assertEqual(self.handler.server.target_port, "remote_port") + + @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) + @patch('time.time') + def test_jwt_early_time(self, mock_time): + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) + jwt_token.make_signed_token(key) + self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) + mock_time.return_value = 50 + + self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem") + with self.assertRaises(self.handler.server.EClose): + self.handler.validate_connection() + + @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) + @patch('time.time') + def test_jwt_late_time(self, mock_time): + key = jwt.JWK() + private_key = open("./tests/fixtures/private.pem", "rb").read() + key.import_from_pem(private_key) + jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 }) + jwt_token.make_signed_token(key) + self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize()) + mock_time.return_value = 250 + + self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem") + with self.assertRaises(self.handler.server.EClose): + self.handler.validate_connection() + + @patch('websockify.websocketproxy.ProxyRequestHandler.send_auth_error', MagicMock()) def test_symmetric_jws_token_plugin(self): secret = open("./tests/fixtures/symmetric.key").read()