Added JWT/JWS/JWE tokens capability

This commit is contained in:
UXabre 2019-01-17 08:53:01 -05:00 committed by UXabre
parent f0bdb0a621
commit f2031eff05
7 changed files with 183 additions and 1 deletions

View File

@ -5,6 +5,8 @@ python:
- 3.3
- 3.4
install: pip install -r test-requirements.txt
install:
- pip install 'setuptools>=18.5,<=39.0.0'
- pip install -r test-requirements.txt
script: python setup.py nosetests --verbosity=3

View File

@ -1,2 +1,4 @@
mox3
nose
jwcrypto;python_version>="2.7"
enum34;python_version=="3.3"

27
tests/fixtures/private.pem vendored Normal file
View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEArwNQal2INbSfoVT50dZ0s8lQ+yMhu45TDc91iuwtDjlFBJ50
E4m3/M6ESBW0S7UTP1bIOGkd/M+u38h0Aruo4qkngdguu9N3BnsU2kOeicdjxc+v
tqRc7/kbkTdT4SrpG8EFP6T2U9U1gtBpLnau02gPrzjaQzyYDLGOBq+Ozt/mN0YJ
UhJ3hlwi18dNKreTzWgJ6mmXQWS0eAmHx8TIs2Nz9x3EfRo9CIMuaaeUjRogIEg5
Tg4xC00ZtDO0/EfgpFkeHJGVQA2DgdBJsr6rq69MjhMfFxRJItxJMJzP6an2HkJ8
onUPBtjEBmk3/fnfiaflzRyEb5zdii6r2TD8TwIDAQABAoIBAGDrzu742WQUoYqx
CqDAyWR/is9px1adHTW6vHexD8qewLAsKFBhpnjkzbE2A+EhaIVdRAipfifxxAC+
fDC/SGouD2kDFe6Cz5nRM90kMXpP59s2hzL4l1d2d2PWZid+ohXysTtr2dbXbokB
bh6DL5J4QKdjLsypk/MDqYneU5IQ1k9ezWzcRgM8/V3M+t+1dLRFLIWsSLbNUgbF
px81efNw8E0voV/d7kZ+6RwUThPHqR0eyLm6djPwHE7/FarZIx4AImwV+9ex44CH
OkrTFOVYenF6jEtYoUuqYCouaWtG7jNVM/f1fksoR8SD6PTq2vn7F4wTLXG1b+K7
45PKMhECgYEA22NH8mK9ICFVd7S6caeAnki+K9cpwmiOUWEIuDInmPn5FOlv7awE
uBFN86v14PqDBtF7Aagyib0NUPo7rIw5+V5SCBZv8gQatjZUkT0vZvpxPU5jmB++
w58yfK7zgdAWCepLxIPyTA7CAT1dmiVmuosz2pJjbo4fecVG222IE10CgYEAzDg+
RVlvMYGy04UMmUoUNeeRlW6km/W6dqQ7EtcxfDv4O7boRDTBSRBzfIsRdXHZhcHN
gCeB2Uiz8IO3s0Yt0+y/6cTI60uJ4S7Mb2JvWJvDCKWhS3pE1BL+LJJC4Hn7khJH
yHYFOLOfnuCbOs8VA7IMmbdTPHirIKWTT5j5H5sCgYEAygK/KweUUlOfWVyHGUQ9
gIJG6iNzhlm0QmbxGnrET25N1t2kfNsadUsp1igPfhvuLocRltMDxiTYcCoabKWq
dF5PdrcCWX1CA2o/sIUAcvhE8UiPGHKSu5qJaJnIC05KHNMq9UbyAurL5UxWNiwe
TcMD+k01VYV0ojHvLvnKhNkCgYArkoh+xXE7D+A2zzl771lWkvz19DB88jYBoFLW
V0HArw7str7h5pui2ja5yPZFp6/woQQWptdGpAN4erIUNxIKGIZt+0WfJnPZruGB
lnAJaNp5GtXKQ+ExmofOvLo2KPCrHulf9QZyLakN/gBA0PQ74J5docbJrTld8tX2
cr4cpwKBgHqr2zybmywAmjn8wY0bUjRAyhdN8eiwYaGPtOSFt6IcWxEnNbAo5Jc2
KsywpagjFsXZsi4Obn2XsqR7VX5bNbpNXIyLaMwBOy7MixyecgPF8tu7I4zo/CWm
7gewTKBhwVPTDAOzHqIpJGrOnUgzJM3ijkCWMn3eAh4ccOjsrKq9
-----END RSA PRIVATE KEY-----

9
tests/fixtures/public.pem vendored Normal file
View File

@ -0,0 +1,9 @@
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArwNQal2INbSfoVT50dZ0
s8lQ+yMhu45TDc91iuwtDjlFBJ50E4m3/M6ESBW0S7UTP1bIOGkd/M+u38h0Aruo
4qkngdguu9N3BnsU2kOeicdjxc+vtqRc7/kbkTdT4SrpG8EFP6T2U9U1gtBpLnau
02gPrzjaQzyYDLGOBq+Ozt/mN0YJUhJ3hlwi18dNKreTzWgJ6mmXQWS0eAmHx8TI
s2Nz9x3EfRo9CIMuaaeUjRogIEg5Tg4xC00ZtDO0/EfgpFkeHJGVQA2DgdBJsr6r
q69MjhMfFxRJItxJMJzP6an2HkJ8onUPBtjEBmk3/fnfiaflzRyEb5zdii6r2TD8
TwIDAQAB
-----END PUBLIC KEY-----

1
tests/fixtures/symmetric.key vendored Normal file
View File

@ -0,0 +1 @@
secret_sauce

View File

@ -16,6 +16,7 @@
""" Unit tests for websocketproxy """
import sys
import unittest
import unittest
import socket
@ -27,6 +28,9 @@ from websockify import websocketproxy
from websockify import token_plugins
from websockify import auth_plugins
if sys.version_info >= (2,7):
from jwcrypto import jwt
try:
from StringIO import StringIO
BytesIO = StringIO
@ -125,6 +129,96 @@ class ProxyRequestHandlerTestCase(unittest.TestCase):
self.assertEqual(self.handler.server.target_host, "somehost")
self.assertEqual(self.handler.server.target_port, "blah")
if sys.version_info >= (2,7):
def test_asymmetric_jws_token_plugin(self):
key = jwt.JWK()
private_key = open("./tests/fixtures/private.pem", "rb").read()
key.import_from_pem(private_key)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem")
self.handler.validate_connection()
self.assertEqual(self.handler.server.target_host, "remote_host")
self.assertEqual(self.handler.server.target_port, "remote_port")
def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self):
key = jwt.JWK()
private_key = open("./tests/fixtures/private.pem", "rb").read()
key.import_from_pem(private_key)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong.pub")
self.assertRaises(self.handler.server.EClose,
self.handler.validate_connection)
def test_symmetric_jws_token_plugin(self):
secret = open("./tests/fixtures/symmetric.key").read()
key = jwt.JWK()
key.import_key(kty="oct",k=secret)
jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/symmetric.key")
self.handler.validate_connection()
self.assertEqual(self.handler.server.target_host, "remote_host")
self.assertEqual(self.handler.server.target_port, "remote_port")
def test_symmetric_jws_token_plugin_with_illigal_key_exception(self):
secret = open("./tests/fixtures/symmetric.key").read()
key = jwt.JWK()
key.import_key(kty="oct",k=secret)
jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong_sauce")
self.assertRaises(self.handler.server.EClose,
self.handler.validate_connection)
def test_asymmetric_jwe_token_plugin(self):
private_key = jwt.JWK()
public_key = jwt.JWK()
private_key_data = open("./tests/fixtures/private.pem", "rb").read()
public_key_data = open("./tests/fixtures/public.pem", "rb").read()
private_key.import_from_pem(private_key_data)
public_key.import_from_pem(public_key_data)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(private_key)
jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"},
claims=jwt_token.serialize())
jwe_token.make_encrypted_token(public_key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwe_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/private.pem")
self.handler.validate_connection()
self.assertEqual(self.handler.server.target_host, "remote_host")
self.assertEqual(self.handler.server.target_port, "remote_port")
def test_auth_plugin(self):
class TestPlugin(auth_plugins.BasePlugin):
def authenticate(self, headers, target_host, target_port):

View File

@ -1,3 +1,4 @@
from __future__ import print_function
import os
import sys
@ -87,3 +88,49 @@ class JSONTokenApi(BaseTokenAPI):
def process_result(self, resp):
resp_json = resp.json()
return (resp_json['host'], resp_json['port'])
class JWTTokenApi(BasePlugin):
# source is a JWT-token, with hostname and port included
# Both JWS as JWE tokens are accepted. With regards to JWE tokens, the key is re-used for both validation and decryption.
def lookup(self, token):
try:
from jwcrypto import jwt
import json
key = jwt.JWK()
try:
with open(self.source, 'rb') as key_file:
key_data = key_file.read()
except Exception as e:
print("Error loading key file: %s" % str(e), file=sys.stderr)
return None
try:
key.import_from_pem(key_data)
except:
try:
key.import_key(k=key_data.decode('utf-8'),kty='oct')
except:
print('Failed to correctly parse key data!', file=sys.stderr)
return None
try:
token = jwt.JWT(key=key, jwt=token)
parsed_header = json.loads(token.header)
if 'enc' in parsed_header:
# Token is encrypted, so we need to decrypt by passing the claims to a new instance
token = jwt.JWT(key=key, jwt=token.claims)
parsed = json.loads(token.claims)
return (parsed['host'], parsed['port'])
except Exception as e:
print("Failed to parse token: %s" % str(e), file=sys.stderr)
return None
except ImportError as e:
print("package jwcrypto not found, are you sure you've installed it correctly?", file=sys.stderr)
return None