baseimage-tutorial-framework/lib/tfw/components/history_monitor.py
2018-03-30 18:11:38 +02:00

81 lines
2.3 KiB
Python

from os.path import dirname
from re import findall
from re import compile as compileregex
from abc import ABC, abstractmethod
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from tfw.components.mixins.callback_mixin import CallbackMixin
from tfw.components.decorators.rate_limiter import RateLimiter
class CallbackEventHandler(PatternMatchingEventHandler, ABC):
def __init__(self, files, *callbacks):
super().__init__(files)
self.callbacks = callbacks
@RateLimiter(rate_per_second=2)
def on_modified(self, event):
for callback in self.callbacks:
callback()
class HistoryMonitor(CallbackMixin, ABC):
def __init__(self, histfile):
CallbackMixin.__init__(self)
self.histfile = histfile
self._history = []
self._last_length = len(self._history)
self.observer = Observer()
self.observer.schedule(CallbackEventHandler([self.histfile],
self._fetch_history,
self._invoke_callbacks),
dirname(self.histfile))
@property
def history(self):
return self._history
def _fetch_history(self):
self._last_length = len(self._history)
with open(self.histfile, 'r') as ifile:
pattern = compileregex(self.command_pattern)
data = ifile.read()
self._history = [self.sanitize_command(command) for command in findall(pattern, data)]
@property
@abstractmethod
def command_pattern(self):
raise NotImplementedError
def sanitize_command(self, command):
# pylint: disable=no-self-use
return command
def _invoke_callbacks(self):
if self._last_length < len(self._history):
self._execute_callbacks(self.history)
def watch(self):
self.observer.start()
def stop(self):
self.observer.stop()
self.observer.join()
class BashMonitor(HistoryMonitor):
@property
def command_pattern(self):
return r'.+'
def sanitize_command(self, command):
return command.strip()
class GDBMonitor(HistoryMonitor):
@property
def command_pattern(self):
return r'(?<=\n)\+(.+)\n'