# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. from functools import wraps from base64 import b64encode, b64decode from copy import deepcopy from hashlib import md5 from os import urandom, chmod from os.path import exists from stat import S_IRUSR, S_IWUSR, S_IXUSR from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.hashes import SHA256 from cryptography.hazmat.primitives.hmac import HMAC as _HMAC from cryptography.exceptions import InvalidSignature from tfw.networking import message_bytes from tfw.decorators import lazy_property from tfw.config import TFWENV def message_checksum(message): return md5(message_bytes(message)).hexdigest() def sign_message(key, message): signature = message_signature(key, message) message['signature'] = b64encode(signature).decode() def message_signature(key, message): return HMAC(key, message_bytes(message)).signature def verify_message(key, message): message = deepcopy(message) try: signature_b64 = message.pop('signature') signature = b64decode(signature_b64) actual_signature = message_signature(key, message) return signature == actual_signature except KeyError: return False class KeyManager: def __init__(self): self.keyfile = TFWENV.AUTH_KEY if not exists(self.keyfile): self._init_auth_key() @lazy_property def auth_key(self): with open(self.keyfile, 'rb') as ifile: return ifile.read() def _init_auth_key(self): key = self.generate_key() with open(self.keyfile, 'wb') as ofile: ofile.write(key) self._chmod_700_keyfile() return key @staticmethod def generate_key(): return urandom(32) def _chmod_700_keyfile(self): chmod(self.keyfile, S_IRUSR | S_IWUSR | S_IXUSR) class HMAC: def __init__(self, key, message): self.key = key self.message = message self._hmac = _HMAC( key=key, algorithm=SHA256(), backend=default_backend() ) def _reload_if_finalized(f): # pylint: disable=no-self-argument,not-callable @wraps(f) def wrapped(instance, *args, **kwargs): if getattr(instance, '_finalized', False): instance.__init__(instance.key, instance.message) ret_val = f(instance, *args, **kwargs) setattr(instance, '_finalized', True) return ret_val return wrapped @property @_reload_if_finalized def signature(self): self._hmac.update(self.message) signature = self._hmac.finalize() return signature @_reload_if_finalized def verify(self, signature): self._hmac.update(self.message) try: self._hmac.verify(signature) return True except InvalidSignature: return False