# pylint: disable=redefined-outer-name from queue import Empty, Queue from secrets import token_urlsafe from pathlib import Path from shutil import rmtree from os.path import join from os import mkdir, rename from tempfile import TemporaryDirectory import pytest import watchdog from inotify import InotifyObserver from inotify import ( InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent, InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent, InotifyDirMovedEvent, InotifyDirDeletedEvent ) watchdog.observers.inotify_buffer.InotifyBuffer.delay = 0 class InotifyContext: def __init__(self, workdir, subdir, subfile, observer): self.missing_events = 0 self.workdir = workdir self.subdir = subdir self.subfile = subfile self.observer = observer self.event_to_queue = { InotifyFileCreatedEvent : self.observer.create_queue, InotifyFileModifiedEvent : self.observer.modify_queue, InotifyFileMovedEvent : self.observer.move_queue, InotifyFileDeletedEvent : self.observer.delete_queue, InotifyDirCreatedEvent : self.observer.create_queue, InotifyDirModifiedEvent : self.observer.modify_queue, InotifyDirMovedEvent : self.observer.move_queue, InotifyDirDeletedEvent : self.observer.delete_queue } def create_random_file(self, dirname, extension): filename = self.join(f'{dirname}/{generate_name()}{extension}') Path(filename).touch() return filename def create_random_folder(self, basepath): dirname = self.join(f'{basepath}/{generate_name()}') mkdir(dirname) return dirname def join(self, path): return join(self.workdir, path) def check_event(self, event_type, path): self.missing_events += 1 event = self.event_to_queue[event_type].get(timeout=0.1) assert isinstance(event, event_type) assert event.src_path == path return event def check_empty(self, event_type): with pytest.raises(Empty): self.event_to_queue[event_type].get(timeout=0.1) def check_any(self): attrs = self.observer.__dict__.values() total = sum([q.qsize() for q in attrs if isinstance(q, Queue)]) return total+self.missing_events == len(self.observer.any_list) class InotifyTestObserver(InotifyObserver): def __init__(self, path, patterns=None, exclude=None, recursive=False): self.any_list = [] self.create_queue, self.modify_queue, self.move_queue, self.delete_queue = [Queue() for _ in range(4)] super().__init__(path, patterns, exclude, recursive) def on_any_event(self, event): self.any_list.append(event) def on_created(self, event): self.create_queue.put(event) def on_modified(self, event): self.modify_queue.put(event) def on_moved(self, event): self.move_queue.put(event) def on_deleted(self, event): self.delete_queue.put(event) def generate_name(): return token_urlsafe(16) @pytest.fixture() def context(): with TemporaryDirectory() as workdir: subdir = join(workdir, generate_name()) subfile = join(subdir, generate_name()+'.txt') mkdir(subdir) Path(subfile).touch() monitor = InotifyTestObserver(workdir, recursive=True) monitor.start() yield InotifyContext(workdir, subdir, subfile, monitor) def test_create(context): newfile = context.create_random_file(context.workdir, '.txt') context.check_event(InotifyFileCreatedEvent, newfile) newdir = context.create_random_folder(context.workdir) context.check_event(InotifyDirCreatedEvent, newdir) assert context.check_any() def test_modify(context): with open(context.subfile, 'w') as ofile: ofile.write('text') context.check_event(InotifyFileModifiedEvent, context.subfile) while True: try: context.observer.modify_queue.get(timeout=0.1) context.missing_events += 1 except Empty: break rename(context.subfile, context.subfile+'_new') context.check_event(InotifyDirModifiedEvent, context.subdir) assert context.check_any() def test_move(context): rename(context.subdir, context.subdir+'_new') context.check_event(InotifyDirMovedEvent, context.subdir) context.check_event(InotifyFileMovedEvent, context.subfile) assert context.check_any() def test_delete(context): rmtree(context.subdir) context.check_event(InotifyFileDeletedEvent, context.subfile) context.check_event(InotifyDirDeletedEvent, context.subdir) assert context.check_any() def test_path(context): context.observer.path = context.subdir context.create_random_folder(context.workdir) newfile = context.create_random_file(context.subdir, '.txt') context.check_event(InotifyFileCreatedEvent, newfile) context.observer.path = context.subdir assert context.check_any() def test_patterns(context): context.observer.patterns = ['*.txt'] context.create_random_file(context.subdir, '.bin') newfile = context.create_random_file(context.subdir, '.txt') context.check_event(InotifyFileCreatedEvent, newfile) context.check_empty(InotifyFileCreatedEvent) assert context.check_any() context.observer.patterns = None def test_exclude(context): context.observer.exclude = ['*.txt'] context.create_random_file(context.subdir, '.txt') newfile = context.create_random_file(context.subdir, '.bin') context.check_event(InotifyFileCreatedEvent, newfile) context.check_empty(InotifyFileCreatedEvent) assert context.check_any() context.observer.exclude = None def test_stress(context): newfile = [] for i in range(1024): newfile.append(context.create_random_file(context.subdir, '.txt')) for i in range(1024): context.check_event(InotifyFileCreatedEvent, newfile[i]) assert context.check_any()