baseimage-tutorial-framework/lib/tfw/components/history_monitor.py

116 lines
3.3 KiB
Python
Raw Normal View History

# Copyright (C) 2018 Avatao.com Innovative Learning Kft.
# All Rights Reserved. See LICENSE file for details.
from pwd import getpwnam
from grp import getgrnam
from pathlib import Path
from os import chown
from os.path import dirname
from re import findall
from re import compile as compileregex
from abc import ABC, abstractmethod
from tfw.components.inotify import InotifyObserver
from tfw.event_handlers import TFWServerUplinkConnector
class HistoryMonitor(ABC, InotifyObserver):
2018-04-18 17:44:26 +00:00
"""
Abstract class capable of monitoring and parsing a history file such as
bash HISTFILEs. Monitoring means detecting when the file was changed and
notifying subscribers about new content in the file.
This is useful for monitoring CLI sessions.
To specify a custom HistoryMonitor inherit from this class and override the
command pattern property and optionally the sanitize_command method.
See examples below.
"""
def __init__(self, uplink, histfile):
self._domain = ''
self.histfile = histfile
self.history = []
self._last_length = len(self.history)
self.uplink = uplink
uid = getpwnam('user').pw_uid
gid = getgrnam('users').gr_gid
path = Path(self.histfile)
path.touch()
chown(self.histfile, uid, gid)
super().__init__(self.histfile)
@property
def domain(self):
return self._domain
@domain.setter
def domain(self, domain):
self._domain = domain
def on_modified(self, event):
self._fetch_history()
if self._last_length < len(self.history):
for command in self.history[self._last_length:]:
self.send_message(command)
def _fetch_history(self):
self._last_length = len(self.history)
with open(self.histfile, 'r') as ifile:
pattern = compileregex(self.command_pattern)
data = ifile.read()
self.history = [
self.sanitize_command(command)
for command in findall(pattern, data)
]
@property
@abstractmethod
def command_pattern(self):
raise NotImplementedError
def sanitize_command(self, command):
# pylint: disable=no-self-use
return command
def send_message(self, command):
self.uplink.send_message({
'key': f'history.{self.domain}',
'value': command
})
class BashMonitor(HistoryMonitor):
2018-04-18 17:44:26 +00:00
"""
HistoryMonitor for monitoring bash CLI sessions.
This requires the following to be set in bash
(note that this is done automatically by TFW):
2018-06-01 14:20:20 +00:00
PROMPT_COMMAND="history -a"
shopt -s cmdhist
shopt -s histappend
unset HISTCONTROL
2018-04-18 17:44:26 +00:00
"""
def __init__(self, uplink, histfile):
super().__init__(uplink, histfile)
self.domain = 'bash'
@property
def command_pattern(self):
return r'.+'
def sanitize_command(self, command):
return command.strip()
class GDBMonitor(HistoryMonitor):
2018-04-18 17:44:26 +00:00
"""
HistoryMonitor to monitor GDB sessions.
For this to work "set trace-commands on" must be set in GDB.
"""
def __init__(self, histfile):
super().__init__(histfile)
self.domain = 'gdb'
@property
def command_pattern(self):
return r'(?<=\n)\+(.+)\n'