diff --git a/lib/tfw/components/inotify/__init__.py b/lib/tfw/components/inotify/__init__.py new file mode 100644 index 0000000..1b97c47 --- /dev/null +++ b/lib/tfw/components/inotify/__init__.py @@ -0,0 +1,6 @@ +from .inotify import InotifyObserver +from .inotify import ( + InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent, + InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent, + InotifyDirMovedEvent, InotifyDirDeletedEvent +) diff --git a/lib/tfw/components/inotify/inotify.py b/lib/tfw/components/inotify/inotify.py new file mode 100644 index 0000000..33b0936 --- /dev/null +++ b/lib/tfw/components/inotify/inotify.py @@ -0,0 +1,169 @@ +# pylint: disable=too-few-public-methods + +from time import time + +from watchdog.observers import Observer +from watchdog.events import FileSystemMovedEvent, PatternMatchingEventHandler +from watchdog.events import ( + FileCreatedEvent, FileModifiedEvent, FileMovedEvent, FileDeletedEvent, + DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent +) + + +class InotifyEvent: + def __init__(self, src_path): + self.date = time() + self.src_path = src_path + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return f'{self.date}\t{self.__class__.__name__}:\t{self.src_path}' + + +class InotifyMovedEvent(InotifyEvent): + def __init__(self, src_path, dest_path): + self.dest_path = dest_path + super().__init__(src_path) + + def __repr__(self): + return super().__repr__()+f'\t-> {self.dest_path}' + + +class InotifyFileCreatedEvent(InotifyEvent): + pass + + +class InotifyFileModifiedEvent(InotifyEvent): + pass + + +class InotifyFileMovedEvent(InotifyMovedEvent): + pass + + +class InotifyFileDeletedEvent(InotifyEvent): + pass + + +class InotifyDirCreatedEvent(InotifyEvent): + pass + + +class InotifyDirModifiedEvent(InotifyEvent): + pass + + +class InotifyDirMovedEvent(InotifyMovedEvent): + pass + + +class InotifyDirDeletedEvent(InotifyEvent): + pass + + +class InotifyObserver: + def __init__(self, path, patterns=None, exclude=None, recursive=False): + self._path = path + self._patterns = patterns + self._exclude = exclude + self._recursive = recursive + self._observer = Observer() + self._reset(path, patterns, exclude) + + def _reset(self, path, patterns, exclude): + dispatch_event = self.dispatch_event + class TransformerEventHandler(PatternMatchingEventHandler): + def on_any_event(self, event): + dispatch_event(event) + self.handler = TransformerEventHandler(patterns, exclude) + self._observer.unschedule_all() + self._observer.schedule(self.handler, path, self._recursive) + + @property + def path(self): + return self._path + + @path.setter + def path(self, path): + self._path = path + self._reset(path, self._patterns, self._exclude) + + @property + def patterns(self): + return self._patterns + + @patterns.setter + def patterns(self, patterns): + self._patterns = patterns + self._reset(self._path, patterns, self._exclude) + + @property + def exclude(self): + return self._exclude + + @exclude.setter + def exclude(self, exclude): + self._exclude = exclude + self._reset(self._path, self._patterns, exclude) + + def start(self): + self._observer.start() + + def stop(self): + self._observer.stop() + self._observer.join() + + def dispatch_event(self, event): + event_to_action = { + InotifyFileCreatedEvent : self.on_created, + InotifyFileModifiedEvent : self.on_modified, + InotifyFileMovedEvent : self.on_moved, + InotifyFileDeletedEvent : self.on_deleted, + InotifyDirCreatedEvent : self.on_created, + InotifyDirModifiedEvent : self.on_modified, + InotifyDirMovedEvent : self.on_moved, + InotifyDirDeletedEvent : self.on_deleted + } + + event = self.transform_event(event) + self.on_any_event(event) + event_to_action[type(event)](event) + + @staticmethod + def transform_event(event): + watchdog_to_inotify = { + FileCreatedEvent : InotifyFileCreatedEvent, + FileModifiedEvent : InotifyFileModifiedEvent, + FileMovedEvent : InotifyFileMovedEvent, + FileDeletedEvent : InotifyFileDeletedEvent, + DirCreatedEvent : InotifyDirCreatedEvent, + DirModifiedEvent : InotifyDirModifiedEvent, + DirMovedEvent : InotifyDirMovedEvent, + DirDeletedEvent : InotifyDirDeletedEvent + } + + try: + cls = watchdog_to_inotify[type(event)] + except KeyError: + raise NameError('Watchdog API returned an unknown event.') + + if isinstance(event, FileSystemMovedEvent): + return cls(event.src_path, event.dest_path) + return cls(event.src_path) + + def on_any_event(self, event): + pass + + def on_created(self, event): + pass + + def on_modified(self, event): + pass + + def on_moved(self, event): + pass + + def on_deleted(self, event): + pass diff --git a/lib/tfw/components/inotify/test_inotify.py b/lib/tfw/components/inotify/test_inotify.py new file mode 100644 index 0000000..6b4bdb3 --- /dev/null +++ b/lib/tfw/components/inotify/test_inotify.py @@ -0,0 +1,186 @@ +# pylint: disable=redefined-outer-name + +from queue import Empty, Queue +from secrets import token_urlsafe +from pathlib import Path +from shutil import rmtree +from os.path import join +from os import chdir, mkdir, rename, walk, listdir +from tempfile import TemporaryDirectory + +import pytest + +from inotify import InotifyObserver +from inotify import ( + InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent, + InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent, + InotifyDirMovedEvent, InotifyDirDeletedEvent +) + + +class InotifyContext: + def __init__(self, folder, observer): + self.missing_events = 0 + self.folder = folder + self.observer = observer + + self.event_to_queue = { + InotifyFileCreatedEvent : self.observer.create_queue, + InotifyFileModifiedEvent : self.observer.modify_queue, + InotifyFileMovedEvent : self.observer.move_queue, + InotifyFileDeletedEvent : self.observer.delete_queue, + InotifyDirCreatedEvent : self.observer.create_queue, + InotifyDirModifiedEvent : self.observer.modify_queue, + InotifyDirMovedEvent : self.observer.move_queue, + InotifyDirDeletedEvent : self.observer.delete_queue + } + + def create_random_file(self, dirname, extension): + filename = self.join(f'{dirname}/{generate_name()}{extension}') + Path(filename).touch() + return filename + + def create_random_folder(self, basepath): + dirname = self.join(f'{basepath}/{generate_name()}') + mkdir(dirname) + return dirname + + def join(self, path): + return join(self.folder, path) + + def check_event(self, event_type, path): + self.missing_events += 1 + event = self.event_to_queue[event_type].get(timeout=1) + assert isinstance(event, event_type) + assert event.src_path == path + return event + + def check_empty(self, event_type): + with pytest.raises(Empty): + self.event_to_queue[event_type].get(timeout=1) + + def check_any(self): + attrs = self.observer.__dict__.values() + total = sum([q.qsize() for q in attrs if isinstance(q, Queue)]) + return total+self.missing_events == len(self.observer.any_list) + + +class InotifyTestObserver(InotifyObserver): + def __init__(self, path, patterns=None, exclude=None, recursive=False): + self.any_list = [] + self.create_queue, self.modify_queue, self.move_queue, self.delete_queue = [Queue() for _ in range(4)] + super().__init__(path, patterns, exclude, recursive) + + def on_any_event(self, event): + self.any_list.append(event) + + def on_created(self, event): + self.create_queue.put(event) + + def on_modified(self, event): + self.modify_queue.put(event) + + def on_moved(self, event): + self.move_queue.put(event) + + def on_deleted(self, event): + self.delete_queue.put(event) + +def generate_name(): + return token_urlsafe(16) + +@pytest.fixture() +def context(): + with TemporaryDirectory() as workdir: + chdir(workdir) + + for _ in range(5): + newdir = join(workdir, generate_name()) + mkdir(newdir) + Path(join(newdir, generate_name()+'.txt')).touch() + + monitor = InotifyTestObserver(workdir, recursive=True) + monitor.start() + yield InotifyContext(workdir, monitor) + +def test_create(context): + for _, dirs, _ in list(walk(context.folder)): + for name in dirs: + newfile = context.create_random_file(name, '.txt') + context.check_event(InotifyFileCreatedEvent, newfile) + newdir = context.create_random_folder(name) + context.check_event(InotifyDirCreatedEvent, newdir) + assert context.check_any() + +def test_modify(context): + for root, _, files in list(walk(context.folder)): + for name in files: + oldfile = join(root, name) + with open(oldfile, 'w') as ofile: + ofile.write('text') + context.check_event(InotifyFileModifiedEvent, oldfile) + rename(oldfile, oldfile+'_new') + context.check_event(InotifyDirModifiedEvent, root) + assert context.check_any() + +def test_move(context): + for root, dirs, _ in list(walk(context.folder)): + for name in dirs: + olddir = join(root, name) + oldfile = join(olddir, list(listdir(olddir))[0]) + rename(olddir, olddir+'_new') + context.check_event(InotifyDirMovedEvent, olddir) + context.check_event(InotifyFileMovedEvent, oldfile) + assert context.check_any() + +def test_delete(context): + for root, dirs, _ in list(walk(context.folder)): + for name in dirs: + olddir = join(root, name) + oldfile = join(olddir, list(listdir(olddir))[0]) + rmtree(olddir) + context.check_event(InotifyFileDeletedEvent, oldfile) + context.check_event(InotifyDirDeletedEvent, olddir) + assert context.check_any() + +def test_path(context): + for _, dirs, _ in list(walk(context.folder)): + for name in dirs: + context.observer.path = context.join(name) + context.create_random_folder('.') + newfile = context.create_random_file(name, '.txt') + context.check_event(InotifyFileCreatedEvent, newfile) + context.observer.path = context.folder + assert context.check_any() + +def test_patterns(context): + for _, dirs, _ in list(walk(context.folder)): + for name in dirs: + context.observer.patterns = ["*.txt"] + context.create_random_file(name, '.bin') + newfile = context.create_random_file(name, '.txt') + context.check_event(InotifyFileCreatedEvent, newfile) + context.check_empty(InotifyFileCreatedEvent) + assert context.check_any() + context.observer.patterns = None + +def test_exclude(context): + for _, dirs, _ in list(walk(context.folder)): + for name in dirs: + context.observer.exclude = ["*.txt"] + context.create_random_file(name, '.txt') + newfile = context.create_random_file(name, '.bin') + context.check_event(InotifyFileCreatedEvent, newfile) + context.check_empty(InotifyFileCreatedEvent) + assert context.check_any() + context.observer.exclude = None + +def test_stress(context): + for _, dirs, _ in list(walk(context.folder)): + for name in dirs: + newfile = [] + for i in range(1024): + newfile.append(context.create_random_file(name, '.txt')) + for i in range(1024): + context.check_event(InotifyFileCreatedEvent, newfile[i]) + assert context.check_any()