baseimage-tutorial-framework/lib/tfw/internals/inotify/inotify.py

190 lines
4.9 KiB
Python

# pylint: disable=too-few-public-methods
from typing import Iterable
from time import time
from os.path import abspath, dirname, isdir
from watchdog.observers import Observer
from watchdog.events import FileSystemMovedEvent, PatternMatchingEventHandler
from watchdog.events import (
FileCreatedEvent, FileModifiedEvent, FileMovedEvent, FileDeletedEvent,
DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent
)
class InotifyEvent:
def __init__(self, src_path):
self.date = time()
self.src_path = src_path
def __str__(self):
return self.__repr__()
def __repr__(self):
return f'{self.__class__.__name__}({self.src_path})'
class InotifyMovedEvent(InotifyEvent):
def __init__(self, src_path, dest_path):
self.dest_path = dest_path
super().__init__(src_path)
def __repr__(self):
return f'{self.__class__.__name__}({self.src_path}, {self.dest_path})'
class InotifyFileCreatedEvent(InotifyEvent):
pass
class InotifyFileModifiedEvent(InotifyEvent):
pass
class InotifyFileMovedEvent(InotifyMovedEvent):
pass
class InotifyFileDeletedEvent(InotifyEvent):
pass
class InotifyDirCreatedEvent(InotifyEvent):
pass
class InotifyDirModifiedEvent(InotifyEvent):
pass
class InotifyDirMovedEvent(InotifyMovedEvent):
pass
class InotifyDirDeletedEvent(InotifyEvent):
pass
class InotifyObserver:
def __init__(self, path, patterns=None, exclude=None, recursive=False):
self._files = []
self._paths = path
self._patterns = patterns or []
self._exclude = exclude
self._recursive = recursive
self._observer = Observer()
self._reset()
def _reset(self):
if isinstance(self._paths, str):
self._paths = [self._paths]
if isinstance(self._paths, Iterable):
self._extract_files_from_paths()
else:
raise ValueError('Expected one or more string paths.')
patterns = self._files+self.patterns
handler = PatternMatchingEventHandler(patterns if patterns else None, self.exclude)
handler.on_any_event = self._dispatch_event
self._observer.unschedule_all()
for path in self.paths:
self._observer.schedule(handler, path, self._recursive)
def _extract_files_from_paths(self):
files, paths = [], []
for path in self._paths:
path = abspath(path)
if isdir(path):
paths.append(path)
else:
paths.append(dirname(path))
files.append(path)
self._files, self._paths = files, paths
@property
def paths(self):
return self._paths
@paths.setter
def paths(self, paths):
self._paths = paths
self._reset()
@property
def patterns(self):
return self._patterns
@patterns.setter
def patterns(self, patterns):
self._patterns = patterns or []
self._reset()
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
self._exclude = exclude
self._reset()
def start(self):
self._observer.start()
def stop(self):
self._observer.stop()
self._observer.join()
def _dispatch_event(self, event):
event_to_action = {
InotifyFileCreatedEvent : self.on_created,
InotifyFileModifiedEvent : self.on_modified,
InotifyFileMovedEvent : self.on_moved,
InotifyFileDeletedEvent : self.on_deleted,
InotifyDirCreatedEvent : self.on_created,
InotifyDirModifiedEvent : self.on_modified,
InotifyDirMovedEvent : self.on_moved,
InotifyDirDeletedEvent : self.on_deleted
}
event = self._transform_event(event)
self.on_any_event(event)
event_to_action[type(event)](event)
@staticmethod
def _transform_event(event):
watchdog_to_inotify = {
FileCreatedEvent : InotifyFileCreatedEvent,
FileModifiedEvent : InotifyFileModifiedEvent,
FileMovedEvent : InotifyFileMovedEvent,
FileDeletedEvent : InotifyFileDeletedEvent,
DirCreatedEvent : InotifyDirCreatedEvent,
DirModifiedEvent : InotifyDirModifiedEvent,
DirMovedEvent : InotifyDirMovedEvent,
DirDeletedEvent : InotifyDirDeletedEvent
}
try:
cls = watchdog_to_inotify[type(event)]
except KeyError:
raise NameError('Watchdog API returned an unknown event.')
if isinstance(event, FileSystemMovedEvent):
return cls(event.src_path, event.dest_path)
return cls(event.src_path)
def on_any_event(self, event):
pass
def on_created(self, event):
pass
def on_modified(self, event):
pass
def on_moved(self, event):
pass
def on_deleted(self, event):
pass