mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-14 05:17:17 +00:00
133 lines
3.8 KiB
Python
133 lines
3.8 KiB
Python
# pylint: disable=bad-whitespace
|
|
from datetime import datetime
|
|
from typing import TextIO, Union
|
|
from dataclasses import dataclass
|
|
from traceback import format_exception
|
|
from logging import DEBUG, getLogger, Handler, Formatter, Filter
|
|
|
|
|
|
class Color:
|
|
RED = '\033[31m'
|
|
GREEN = '\033[32m'
|
|
YELLOW = '\033[33m'
|
|
BLUE = '\033[34m'
|
|
CYAN = '\033[36m'
|
|
WHITE = '\033[37m'
|
|
RESET = '\033[0m'
|
|
|
|
|
|
@dataclass
|
|
class Log:
|
|
stream: Union[str, TextIO]
|
|
formatter: Formatter
|
|
|
|
|
|
class Logger:
|
|
def __init__(self, logs, level=DEBUG):
|
|
self.root_logger = getLogger()
|
|
self.old_level = self.root_logger.level
|
|
self.new_level = level
|
|
self.handlers = []
|
|
for log in logs:
|
|
handler = LogHandler(log.stream)
|
|
handler.setFormatter(log.formatter)
|
|
self.handlers.append(handler)
|
|
|
|
def start(self):
|
|
self.root_logger.setLevel(self.new_level)
|
|
for handler in self.handlers:
|
|
self.root_logger.addHandler(handler)
|
|
|
|
def stop(self):
|
|
self.root_logger.setLevel(self.old_level)
|
|
for handler in self.handlers:
|
|
handler.close()
|
|
self.root_logger.removeHandler(handler)
|
|
|
|
|
|
class LogHandler(Handler):
|
|
def __init__(self, stream):
|
|
if isinstance(stream, str):
|
|
self.stream = open(stream, 'a+')
|
|
self.close_stream = True
|
|
else:
|
|
self.stream = stream
|
|
self.close_stream = False
|
|
super().__init__()
|
|
|
|
def emit(self, record):
|
|
entry = self.format(record)
|
|
self.stream.write(entry+'\n')
|
|
self.stream.flush()
|
|
|
|
def close(self):
|
|
if self.close_stream:
|
|
self.stream.close()
|
|
|
|
class LogFormatter(Formatter):
|
|
severity_to_color = {
|
|
'CRITICAL' : Color.RED,
|
|
'ERROR' : Color.RED,
|
|
'WARNING' : Color.YELLOW,
|
|
'INFO' : Color.GREEN,
|
|
'DEBUG' : Color.BLUE,
|
|
'NOTSET' : Color.CYAN
|
|
}
|
|
|
|
def __init__(self, limit):
|
|
self.limit = limit
|
|
super().__init__()
|
|
|
|
def format(self, record):
|
|
time = datetime.utcfromtimestamp(record.created).strftime('%H:%M:%S')
|
|
if record.args:
|
|
tuple_args = (record.args,) if isinstance(record.args, dict) else record.args
|
|
clean_args = tuple((self.trim(arg) for arg in tuple_args))
|
|
message = record.msg % clean_args
|
|
else:
|
|
message = record.msg
|
|
trace = '\n'+''.join(format_exception(*record.exc_info)) if record.exc_info else ''
|
|
|
|
return (f'[{Color.WHITE}{time}{Color.RESET}|>'
|
|
f'{self.severity_to_color[record.levelname]}{record.module}:'
|
|
f'{record.levelname.lower()}{Color.RESET}] {message}{trace}')
|
|
|
|
def trim(self, value):
|
|
if isinstance(value, dict):
|
|
return {k: self.trim(v) for k, v in value.items()}
|
|
if isinstance(value, str):
|
|
value_str = str(value)
|
|
return value_str if len(value_str) <= self.limit else f'{value_str[:self.limit]}...'
|
|
return value
|
|
|
|
|
|
class VerboseLogFormatter(Formatter):
|
|
def format(self, record):
|
|
date = datetime.utcfromtimestamp(record.created).strftime('%H:%M:%S')
|
|
if record.args:
|
|
message = record.msg % record.args
|
|
else:
|
|
message = record.msg
|
|
trace = '\n'+''.join(format_exception(*record.exc_info)) if record.exc_info else ''
|
|
|
|
return (f'[{date}|>{record.module}:{record.levelname.lower()}] '
|
|
f'{message}{trace}')
|
|
|
|
|
|
class WhitelistFilter(Filter):
|
|
def __init__(self, names):
|
|
self.names = names
|
|
super().__init__()
|
|
|
|
def filter(self, record):
|
|
return record.module in self.names
|
|
|
|
|
|
class BlacklistFilter(Filter):
|
|
def __init__(self, names):
|
|
self.names = names
|
|
super().__init__()
|
|
|
|
def filter(self, record):
|
|
return record.module not in self.names
|