commit bbeb7d85a2680366c06164acd978195c4a1cc1b0 Author: Michał Sałaban Date: Fri Nov 24 03:05:16 2017 +0100 Add address diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f485bca --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.* +!.gitignore +*.py[co] +*~ +*.bak +*.swp diff --git a/monero/__init__.py b/monero/__init__.py new file mode 100644 index 0000000..0627fe6 --- /dev/null +++ b/monero/__init__.py @@ -0,0 +1 @@ +from .address import Address diff --git a/monero/address.py b/monero/address.py new file mode 100644 index 0000000..186f7b3 --- /dev/null +++ b/monero/address.py @@ -0,0 +1,69 @@ +from binascii import hexlify, unhexlify +from sha3 import keccak_256 + +from . import base58 + +class Address(object): + def __init__(self, address): + if len(address) != 95: + raise ValueError("Address must be 95 characters long, is %d" % len(address)) + self._decode(address) + + def _decode(self, address): + self._decoded = unhexlify(base58.decode(address)) + checksum = self._decoded[-4:] + if checksum != keccak_256(self._decoded[:-4]).digest()[:4]: + raise ValueError("Invalid checksum") + + def is_testnet(self): + return self._decoded[0] in bytes([53, 54]) + + def get_view_key(self): + return hexlify(self._decoded[33:65]).decode() + + def get_spend_key(self): + return hexlify(self._decoded[1:33]).decode() + + def with_payment_id(self, payment_id=0): + if isinstance(payment_id, (bytes, str)): + payment_id = int(payment_id, 16) + elif not isinstance(payment_id, int): + raise TypeError("payment_id must be either int or hexadecimal str or bytes") + prefix = 54 if self.is_testnet() else 19 + data = bytes([prefix]) + self._decoded[1:65] + payment_id.to_bytes(8, byteorder='big') + checksum = keccak_256(data).digest()[:4] + return IntegratedAddress(base58.encode(hexlify(data + checksum))) + + def __repr__(self): + return base58.encode(hexlify(self._decoded)) + + def __eq__(self, other): + if isinstance(other, Address): + return str(self) == str(other) + if isinstance(other, str): + return str(self) == other + return super() + + +class IntegratedAddress(Address): + def __init__(self, address): + if len(address) != 106: + raise ValueError("Integrated address must be 106 characters long, is %d" % len(address)) + self._decode(address) + + def get_payment_id(self): + return hexlify(self._decoded[65:-4]).decode() + + def get_base_address(self): + prefix = 53 if self.is_testnet() else 18 + data = bytes([prefix]) + self._decoded[1:65] + checksum = keccak_256(data).digest()[:4] + return Address(base58.encode(hexlify(data + checksum))) + + +def address(addr): + if len(addr) == 95: + return Address(addr) + elif len(addr) == 106: + return IntegratedAddress(addr) + raise ValueError("Address must be either 95 or 106 characters long") diff --git a/monero/base58.py b/monero/base58.py new file mode 100644 index 0000000..83424de --- /dev/null +++ b/monero/base58.py @@ -0,0 +1,168 @@ +# MoneroPy - A python toolbox for Monero +# Copyright (C) 2016 The MoneroPy Developers. +# +# MoneroPy is released under the BSD 3-Clause license. Use and redistribution of +# this software is subject to the license terms in the LICENSE file found in the +# top-level directory of this distribution. + +__alphabet = [ord(s) for s in '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'] +__b58base = 58 +__UINT64MAX = 2**64 +__encodedBlockSizes = [0, 2, 3, 5, 6, 7, 9, 10, 11] +__fullBlockSize = 8 +__fullEncodedBlockSize = 11 + +def _hexToBin(hex): + if len(hex) % 2 != 0: + return "Hex string has invalid length!" + return [int(hex[i*2:i*2+2], 16) for i in range(len(hex)//2)] + +def _binToHex(bin): + return "".join([("0" + hex(int(bin[i])).split('x')[1])[-2:] for i in range(len(bin))]) + +def _strToBin(a): + return [ord(s) for s in a] + +def _binToStr(bin): + return ''.join([chr(bin[i]) for i in range(len(bin))]) + +def _uint8be_to_64(data): + l_data = len(data) + + if l_data < 1 or l_data > 8: + return "Invalid input length" + + res = 0 + switch = 9 - l_data + for i in range(l_data): + if switch == 1: + res = res << 8 | data[i] + elif switch == 2: + res = res << 8 | data[i] + elif switch == 3: + res = res << 8 | data[i] + elif switch == 4: + res = res << 8 | data[i] + elif switch == 5: + res = res << 8 | data[i] + elif switch == 6: + res = res << 8 | data[i] + elif switch == 7: + res = res << 8 | data[i] + elif switch == 8: + res = res << 8 | data[i] + else: + return "Impossible condition" + return res + +def _uint64_to_8be(num, size): + res = [0] * size; + if size < 1 or size > 8: + return "Invalid input length" + + twopow8 = 2**8 + for i in range(size-1,-1,-1): + res[i] = num % twopow8 + num = num // twopow8 + + return res + +def encode_block(data, buf, index): + l_data = len(data) + + if l_data < 1 or l_data > __fullEncodedBlockSize: + return "Invalid block length: " + str(l_data) + + num = _uint8be_to_64(data) + i = __encodedBlockSizes[l_data] - 1 + + while num > 0: + remainder = num % __b58base + num = num // __b58base + buf[index+i] = __alphabet[remainder]; + i -= 1 + + return buf + +def encode(hex): + '''Encode hexadecimal string as base58 (ex: encoding a Monero address).''' + data = _hexToBin(hex) + l_data = len(data) + + if l_data == 0: + return "" + + full_block_count = l_data // __fullBlockSize + last_block_size = l_data % __fullBlockSize + res_size = full_block_count * __fullEncodedBlockSize + __encodedBlockSizes[last_block_size] + + res = [0] * res_size + for i in range(res_size): + res[i] = __alphabet[0] + + for i in range(full_block_count): + res = encode_block(data[(i*__fullBlockSize):(i*__fullBlockSize+__fullBlockSize)], res, i * __fullEncodedBlockSize) + + if last_block_size > 0: + res = encode_block(data[(full_block_count*__fullBlockSize):(full_block_count*__fullBlockSize+last_block_size)], res, full_block_count * __fullEncodedBlockSize) + + return _binToStr(res) + +def decode_block(data, buf, index): + l_data = len(data) + + if l_data < 1 or l_data > __fullEncodedBlockSize: + return "Invalid block length: " + l_data + + res_size = __encodedBlockSizes.index(l_data) + if res_size <= 0: + return "Invalid block size" + + res_num = 0 + order = 1 + for i in range(l_data-1, -1, -1): + digit = __alphabet.index(data[i]) + if digit < 0: + return "Invalid symbol" + + product = order * digit + res_num + if product > __UINT64MAX: + return "Overflow" + + res_num = product + order = order * __b58base + + if res_size < __fullBlockSize and 2**(8 * res_size) <= res_num: + return "Overflow 2" + + tmp_buf = _uint64_to_8be(res_num, res_size) + for i in range(len(tmp_buf)): + buf[i+index] = tmp_buf[i] + + return buf + +def decode(enc): + '''Decode a base58 string (ex: a Monero address) into hexidecimal form.''' + enc = _strToBin(enc) + l_enc = len(enc) + + if l_enc == 0: + return "" + + full_block_count = l_enc // __fullEncodedBlockSize + last_block_size = l_enc % __fullEncodedBlockSize + last_block_decoded_size = __encodedBlockSizes.index(last_block_size) + + if last_block_decoded_size < 0: + return "Invalid encoded length" + + data_size = full_block_count * __fullBlockSize + last_block_decoded_size + + data = [0] * data_size + for i in range(full_block_count): + data = decode_block(enc[(i*__fullEncodedBlockSize):(i*__fullEncodedBlockSize+__fullEncodedBlockSize)], data, i * __fullBlockSize) + + if last_block_size > 0: + data = decode_block(enc[(full_block_count*__fullEncodedBlockSize):(full_block_count*__fullEncodedBlockSize+last_block_size)], data, full_block_count * __fullBlockSize) + + return _binToHex(data) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..72fe1a8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pysha3 diff --git a/test.py b/test.py new file mode 100644 index 0000000..88b1921 --- /dev/null +++ b/test.py @@ -0,0 +1,62 @@ +import unittest + +from monero.address import Address, IntegratedAddress, address + +class Tests(object): + def test_from_and_to_string(self): + a = Address(self.addr) + self.assertEqual(str(a), self.addr) + self.assertEqual(a.get_spend_key(), self.psk) + self.assertEqual(a.get_view_key(), self.pvk) + + ia = IntegratedAddress(self.iaddr) + self.assertEqual(ia.get_payment_id(), self.pid) + self.assertEqual(str(ia), self.iaddr) + self.assertEqual(ia.get_spend_key(), self.psk) + self.assertEqual(ia.get_view_key(), self.pvk) + self.assertEqual(ia.get_base_address(), a) + + def test_payment_id(self): + a = Address(self.addr) + ia = a.with_payment_id(self.pid) + self.assertIsInstance(ia, IntegratedAddress) + self.assertEqual(ia.get_payment_id(), self.pid) + self.assertEqual(str(ia), self.iaddr) + + def test_recognition_and_comparisons(self): + a = Address(self.addr) + a2 = address(self.addr) + self.assertIsInstance(a2, Address) + self.assertEqual(a, a2) + self.assertEqual(a, self.addr) + self.assertEqual(self.addr, a) + self.assertEqual(a.is_testnet(), self.testnet) + self.assertEqual(a2.is_testnet(), self.testnet) + + ia = IntegratedAddress(self.iaddr) + ia2 = address(self.iaddr) + self.assertIsInstance(ia, IntegratedAddress) + self.assertEqual(ia, ia2) + self.assertEqual(ia, self.iaddr) + self.assertEqual(self.iaddr, ia) + self.assertEqual(ia.is_testnet(), self.testnet) + self.assertEqual(ia2.is_testnet(), self.testnet) + self.assertEqual(ia2.get_base_address(), a) + + +class AddressTestCase(unittest.TestCase, Tests): + addr = '43aeKax1ts4BoEbSyzKVbbDRmc8nsnpZLUpQBYvhUxs3KVrodnaFaBEQMDp69u4VaiEG3LSQXA6M61mXPrztCLuh7PFUAmd' + psk = '33a7ceb933b793408d49e82c0a34664a4be7117243cb77a64ef280b866d8aa6e' + pvk = '96f70d63d9d3558b97a5dd200a170b4f45b3177a274aa90496ea683896ff6438' + pid = '4a6f686e47616c74' + iaddr = '4DHKLPmWW8aBoEbSyzKVbbDRmc8nsnpZLUpQBYvhUxs3KVrodnaFaBEQMDp69u4VaiEG3LSQXA6M61mXPrztCLuhAR6GpL18QNwE8h3TuF' + testnet = False + + +class TestnetAddressTestCase(AddressTestCase, Tests): + addr = '9u9j6xG1GNu4ghrdUL35m5PQcJV69YF8731DSTDoh7pDgkBWz2LWNzncq7M5s1ARjPRhvGPX4dBUeC3xNj4wzfrjV6SY3e9' + psk = '345b201b8d1ba216074e3c45ca606c85f68563f60d0b8c0bfab5123f80692aed' + pvk = '9deb70cc7e1e23d635de2d5a3086a293b4580dc2b9133b4211bc09f22fadc4f9' + pid = '4a6f686e47616c74' + iaddr = 'A4rQ7m5VseR4ghrdUL35m5PQcJV69YF8731DSTDoh7pDgkBWz2LWNzncq7M5s1ARjPRhvGPX4dBUeC3xNj4wzfrjihS6W83Km1mE7W3kMa' + testnet = True