# pylint: disable=bad-whitespace from sys import stderr from collections import deque from datetime import datetime from traceback import format_exception, walk_tb from logging import DEBUG, getLogger, Handler, Formatter, Filter from .envvars import TFWENV class COLOR: BLACK = '\033[30m' GREY = '\033[30;1m' RED = '\033[31m' BOLDRED = '\033[31;1m' GREEN = '\033[32m' BOLDGREEN = '\033[32;1m' ORANGE = '\033[33m' YELLOW = '\033[33;1m' BLUE = '\033[34m' BOLDBLUE = '\033[34;1m' MAGENTA = '\033[35m' BOLDMAGENTA = '\033[35;1m' CYAN = '\033[36m' BOLDCYAN = '\033[36;1m' WHITE = '\033[37m' BOLDWHITE = '\033[37;1m' RESET = '\033[0m' class TFWLog: def __init__(self, path=TFWENV.LOGFILE, level=DEBUG): self.log = getLogger() self.old_level = self.log.level self.new_level = level self.handler = TFWLogHandler(path) self.handler.setFormatter(TFWLogFormatter(20)) def start(self): self.log.setLevel(self.new_level) self.log.addHandler(self.handler) def stop(self): self.log.setLevel(self.old_level) self.handler.close() self.log.removeHandler(self.handler) class TFWLogHandler(Handler): def __init__(self, path): self.logfile = open(path, 'a+') super().__init__() def emit(self, record): short_entry, long_entry = self.format(record) stderr.write(short_entry+'\n') self.logfile.write(long_entry+'\n') stderr.flush() self.logfile.flush() def close(self): self.logfile.close() class TFWLogFormatter(Formatter): severity_to_color = { 'CRITICAL' : COLOR.BOLDRED, 'ERROR' : COLOR.RED, 'WARNING' : COLOR.YELLOW, 'INFO' : COLOR.BOLDGREEN, 'DEBUG' : COLOR.BOLDWHITE, 'NOTSET' : COLOR.CYAN } def __init__(self, limit): self.limit = limit self.last_trace = None super().__init__() def format(self, record): date = datetime.utcfromtimestamp(record.created).strftime('%H:%M:%S') if record.args: tuple_args = (record.args,) if isinstance(record.args, dict) else record.args clean_args = tuple((self.trim(arg) for arg in tuple_args)) short_message = record.msg % clean_args long_message = record.msg % record.args else: short_message = record.msg long_message = record.msg if record.exc_info: current_trace = self.fetch_exception_origin(record.exc_info[2]) if current_trace != self.last_trace: self.last_trace = current_trace trace = '\n'+''.join(format_exception(*record.exc_info)) else: trace = (f'\nSee previous traceback...\n' f'{record.exc_info[0].__name__}: {record.exc_info[1]}') else: trace = '' short_entry = (f'[{COLOR.GREY}{date}{COLOR.RESET}|>' f'{self.severity_to_color[record.levelname]}{record.module}:' f'{record.levelname.lower()}{COLOR.RESET}] {short_message}' f'{trace}') long_entry = (f'[{date}|>{record.module}:{record.levelname.lower()}] ' f'{long_message}{trace}') return short_entry, long_entry def trim(self, value): if isinstance(value, dict): return {k: self.trim(v, True) for k, v in value.items()} if isinstance(value, (int, float)): return value value_str = str(value) return value_str if len(value_str) <= self.limit else f'{value_str[:self.limit]}...' @staticmethod def fetch_exception_origin(trace): return deque(walk_tb(trace), maxlen=1).pop() class TFWLogWhitelistFilter(Filter): def __init__(self, names): self.names = names super().__init__() def filter(self, record): return record.module in self.names class TFWLogBlacklistFilter(Filter): def __init__(self, names): self.names = names super().__init__() def filter(self, record): return record.module not in self.names