mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-08 19:27:16 +00:00
140 lines
4.3 KiB
Python
140 lines
4.3 KiB
Python
# pylint: disable=bad-whitespace
|
|
from sys import stderr
|
|
from collections import deque
|
|
from datetime import datetime
|
|
from traceback import format_exception, walk_tb
|
|
from logging import DEBUG, getLogger, Handler, Formatter, Filter
|
|
|
|
TFW_LOG_PATH = '/var/log/tfw.log'
|
|
|
|
|
|
class COLOR:
|
|
BLACK = '\033[30m'
|
|
GREY = '\033[30;1m'
|
|
RED = '\033[31m'
|
|
BOLDRED = '\033[31;1m'
|
|
GREEN = '\033[32m'
|
|
BOLDGREEN = '\033[32;1m'
|
|
ORANGE = '\033[33m'
|
|
YELLOW = '\033[33;1m'
|
|
BLUE = '\033[34m'
|
|
BOLDBLUE = '\033[34;1m'
|
|
MAGENTA = '\033[35m'
|
|
BOLDMAGENTA = '\033[35;1m'
|
|
CYAN = '\033[36m'
|
|
BOLDCYAN = '\033[36;1m'
|
|
WHITE = '\033[37m'
|
|
BOLDWHITE = '\033[37;1m'
|
|
RESET = '\033[0m'
|
|
|
|
|
|
class TFWLog:
|
|
def __init__(self, path=TFW_LOG_PATH, level=DEBUG):
|
|
self.log = getLogger()
|
|
self.old_level = self.log.level
|
|
self.new_level = level
|
|
self.handler = TFWLogHandler(path)
|
|
self.handler.setFormatter(TFWLogFormatter())
|
|
|
|
def start(self):
|
|
self.log.setLevel(self.new_level)
|
|
self.log.addHandler(self.handler)
|
|
self.log.info('Logging started.')
|
|
|
|
def stop(self):
|
|
self.log.info('Stop logging.')
|
|
self.log.setLevel(self.old_level)
|
|
self.handler.close()
|
|
self.log.removeHandler(self.handler)
|
|
|
|
|
|
class TFWLogHandler(Handler):
|
|
def __init__(self, path):
|
|
self.logfile = open(path, 'a+')
|
|
super().__init__()
|
|
|
|
def emit(self, record):
|
|
short_entry, long_entry = self.format(record)
|
|
stderr.write(short_entry+'\n')
|
|
self.logfile.write(long_entry+'\n')
|
|
stderr.flush()
|
|
self.logfile.flush()
|
|
|
|
def close(self):
|
|
self.logfile.close()
|
|
|
|
class TFWLogFormatter(Formatter):
|
|
severity = {
|
|
'CRITICAL' : COLOR.BOLDRED,
|
|
'ERROR' : COLOR.RED,
|
|
'WARNING' : COLOR.YELLOW,
|
|
'INFO' : COLOR.BOLDGREEN,
|
|
'DEBUG' : COLOR.BOLDWHITE,
|
|
'NOTSET' : COLOR.CYAN
|
|
}
|
|
|
|
def __init__(self):
|
|
self.last_trace = None
|
|
super().__init__()
|
|
|
|
def format(self, record):
|
|
date = datetime.utcfromtimestamp(record.created).strftime('%H:%M:%S')
|
|
if record.args:
|
|
tuple_args = (record.args,) if isinstance(record.args, dict) else record.args
|
|
clean_args = tuple((self.trim(arg) for arg in tuple_args))
|
|
short_message = record.msg % clean_args
|
|
long_message = record.msg % record.args
|
|
else:
|
|
short_message = record.msg
|
|
long_message = record.msg
|
|
|
|
if record.exc_info:
|
|
current_trace = self.fetch_exception_origin(record.exc_info[2])
|
|
if current_trace != self.last_trace:
|
|
self.last_trace = current_trace
|
|
trace = '\n'+''.join(format_exception(*record.exc_info))
|
|
else:
|
|
trace = (f'\nSee previous traceback...\n'
|
|
f'{record.exc_info[0].__name__}: {record.exc_info[1]}')
|
|
else:
|
|
trace = ''
|
|
|
|
short_entry = (f'[{COLOR.GREY}{date}{COLOR.RESET}|>'
|
|
f'{self.severity[record.levelname]}{record.module}:'
|
|
f'{record.levelname.lower()}{COLOR.RESET}] {short_message}'
|
|
f'{trace}')
|
|
long_entry = (f'[{date}|>{record.module}:{record.levelname.lower()}] '
|
|
f'{long_message}{trace}')
|
|
return short_entry, long_entry
|
|
|
|
def trim(self, value, in_dict=False):
|
|
if isinstance(value, dict):
|
|
trimmed = {k: self.trim(v, True) for k, v in value.items()}
|
|
return trimmed if in_dict else str(trimmed)
|
|
if isinstance(value, (int, float)):
|
|
return value
|
|
value_str = str(value)
|
|
return value_str if len(value_str) <= 20 else f'{value_str[:20]}...'
|
|
|
|
@staticmethod
|
|
def fetch_exception_origin(trace):
|
|
return deque(walk_tb(trace), maxlen=1).pop()
|
|
|
|
|
|
class TFWLogWhitelistFilter(Filter):
|
|
def __init__(self, names):
|
|
self.names = names
|
|
super().__init__()
|
|
|
|
def filter(self, record):
|
|
return record.module in self.names
|
|
|
|
|
|
class TFWLogBlacklistFilter(Filter):
|
|
def __init__(self, names):
|
|
self.names = names
|
|
super().__init__()
|
|
|
|
def filter(self, record):
|
|
return record.module not in self.names
|