mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-09 01:17:16 +00:00
110 lines
3.2 KiB
Python
110 lines
3.2 KiB
Python
# pylint: disable=bad-whitespace
|
|
from sys import stderr
|
|
from datetime import datetime
|
|
from logging import DEBUG, getLogger, Handler, Formatter, Filter
|
|
|
|
from .envvars import TFWENV
|
|
|
|
|
|
class Color:
|
|
GREY = '\033[30;1m'
|
|
RED = '\033[31m'
|
|
BOLDRED = '\033[31;1m'
|
|
BOLDGREEN = '\033[32;1m'
|
|
YELLOW = '\033[33;1m'
|
|
CYAN = '\033[36m'
|
|
BOLDWHITE = '\033[37;1m'
|
|
RESET = '\033[0m'
|
|
|
|
|
|
class TFWLog:
|
|
def __init__(self, path=TFWENV.LOGFILE, level=DEBUG):
|
|
self.log = getLogger()
|
|
self.old_level = self.log.level
|
|
self.new_level = level
|
|
self.handler = TFWLogHandler(path)
|
|
self.handler.setFormatter(TFWLogFormatter(20))
|
|
|
|
def start(self):
|
|
self.log.setLevel(self.new_level)
|
|
self.log.addHandler(self.handler)
|
|
|
|
def stop(self):
|
|
self.log.setLevel(self.old_level)
|
|
self.handler.close()
|
|
self.log.removeHandler(self.handler)
|
|
|
|
|
|
class TFWLogHandler(Handler):
|
|
def __init__(self, path):
|
|
self.logfile = open(path, 'a+')
|
|
super().__init__()
|
|
|
|
def emit(self, record):
|
|
short_entry, long_entry = self.format(record)
|
|
stderr.write(short_entry+'\n')
|
|
self.logfile.write(long_entry+'\n')
|
|
stderr.flush()
|
|
self.logfile.flush()
|
|
|
|
def close(self):
|
|
self.logfile.close()
|
|
|
|
class TFWLogFormatter(Formatter):
|
|
severity_to_color = {
|
|
'CRITICAL' : Color.BOLDRED,
|
|
'ERROR' : Color.RED,
|
|
'WARNING' : Color.YELLOW,
|
|
'INFO' : Color.BOLDGREEN,
|
|
'DEBUG' : Color.BOLDWHITE,
|
|
'NOTSET' : Color.CYAN
|
|
}
|
|
|
|
def __init__(self, limit):
|
|
self.limit = limit
|
|
super().__init__()
|
|
|
|
def format(self, record):
|
|
date = datetime.utcfromtimestamp(record.created).strftime('%H:%M:%S')
|
|
if record.args:
|
|
tuple_args = (record.args,) if isinstance(record.args, dict) else record.args
|
|
clean_args = tuple((self.trim(arg) for arg in tuple_args))
|
|
short_message = record.msg % clean_args
|
|
long_message = record.msg % record.args
|
|
else:
|
|
short_message = record.msg
|
|
long_message = record.msg
|
|
|
|
short_entry = (f'[{Color.GREY}{date}{Color.RESET}|>'
|
|
f'{self.severity_to_color[record.levelname]}{record.module}:'
|
|
f'{record.levelname.lower()}{Color.RESET}] {short_message}')
|
|
long_entry = (f'[{date}|>{record.module}:{record.levelname.lower()}] '
|
|
f'{long_message}')
|
|
return short_entry, long_entry
|
|
|
|
def trim(self, value):
|
|
if isinstance(value, dict):
|
|
return {k: self.trim(v) for k, v in value.items()}
|
|
if isinstance(value, str):
|
|
value_str = str(value)
|
|
return value_str if len(value_str) <= self.limit else f'{value_str[:self.limit]}...'
|
|
return value
|
|
|
|
|
|
class WhitelistFilter(Filter):
|
|
def __init__(self, names):
|
|
self.names = names
|
|
super().__init__()
|
|
|
|
def filter(self, record):
|
|
return record.module in self.names
|
|
|
|
|
|
class BlacklistFilter(Filter):
|
|
def __init__(self, names):
|
|
self.names = names
|
|
super().__init__()
|
|
|
|
def filter(self, record):
|
|
return record.module not in self.names
|