baseimage-tutorial-framework/tfw/internals/inotify/test_inotify.py

180 lines
6.2 KiB
Python

# pylint: disable=redefined-outer-name
from queue import Empty, Queue
from secrets import token_urlsafe
from pathlib import Path
from shutil import rmtree
from os.path import join
from os import mkdir, remove, rename
from tempfile import TemporaryDirectory
from contextlib import suppress
import watchdog
import pytest
from .inotify import InotifyObserver
from .inotify import (
InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent,
InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent,
InotifyDirMovedEvent, InotifyDirDeletedEvent
)
with suppress(AttributeError):
watchdog.observers.inotify_buffer.InotifyBuffer.delay = 0
class InotifyContext:
def __init__(self, workdir, subdir, subfile, observer):
self.missing_events = 0
self.workdir = workdir
self.subdir = subdir
self.subfile = subfile
self.observer = observer
self.event_to_queue = {
InotifyFileCreatedEvent : self.observer.create_queue,
InotifyFileModifiedEvent : self.observer.modify_queue,
InotifyFileMovedEvent : self.observer.move_queue,
InotifyFileDeletedEvent : self.observer.delete_queue,
InotifyDirCreatedEvent : self.observer.create_queue,
InotifyDirModifiedEvent : self.observer.modify_queue,
InotifyDirMovedEvent : self.observer.move_queue,
InotifyDirDeletedEvent : self.observer.delete_queue
}
def create_random_file(self, dirname, extension):
filename = self.join(f'{dirname}/{generate_name()}{extension}')
Path(filename).touch()
return filename
def create_random_folder(self, basepath):
dirname = self.join(f'{basepath}/{generate_name()}')
mkdir(dirname)
return dirname
def join(self, path):
return join(self.workdir, path)
def check_event(self, event_type, path):
self.missing_events += 1
event = self.event_to_queue[event_type].get(timeout=0.1)
assert isinstance(event, event_type)
assert event.src_path == path
return event
def check_empty(self, event_type):
with pytest.raises(Empty):
self.event_to_queue[event_type].get(timeout=0.1)
def check_any(self):
attrs = self.observer.__dict__.values()
total = sum([q.qsize() for q in attrs if isinstance(q, Queue)])
return total+self.missing_events == len(self.observer.any_list)
class InotifyTestObserver(InotifyObserver):
def __init__(self, paths, patterns=None, exclude=None, recursive=False):
self.any_list = []
self.create_queue, self.modify_queue, self.move_queue, self.delete_queue = [Queue() for _ in range(4)]
super().__init__(paths, patterns, exclude, recursive)
def on_any_event(self, event):
self.any_list.append(event)
def on_created(self, event):
self.create_queue.put(event)
def on_modified(self, event):
self.modify_queue.put(event)
def on_moved(self, event):
self.move_queue.put(event)
def on_deleted(self, event):
self.delete_queue.put(event)
def generate_name():
return token_urlsafe(16)
@pytest.fixture()
def context():
with TemporaryDirectory() as workdir:
subdir = join(workdir, generate_name())
subfile = join(subdir, generate_name()+'.txt')
mkdir(subdir)
Path(subfile).touch()
monitor = InotifyTestObserver(workdir, recursive=True)
monitor.start()
yield InotifyContext(workdir, subdir, subfile, monitor)
def test_create(context):
newfile = context.create_random_file(context.workdir, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
newdir = context.create_random_folder(context.workdir)
context.check_event(InotifyDirCreatedEvent, newdir)
assert context.check_any()
def test_modify(context):
with open(context.subfile, 'wb', buffering=0) as ofile:
ofile.write(b'text')
context.check_event(InotifyFileModifiedEvent, context.subfile)
while True:
try:
context.observer.modify_queue.get(timeout=0.1)
context.missing_events += 1
except Empty:
break
rename(context.subfile, context.subfile+'_new')
context.check_event(InotifyDirModifiedEvent, context.subdir)
assert context.check_any()
def test_move(context):
rename(context.subdir, context.subdir+'_new')
context.check_event(InotifyDirMovedEvent, context.subdir)
context.check_event(InotifyFileMovedEvent, context.subfile)
assert context.check_any()
def test_delete(context):
rmtree(context.subdir)
context.check_event(InotifyFileDeletedEvent, context.subfile)
context.check_event(InotifyDirDeletedEvent, context.subdir)
assert context.check_any()
def test_paths(context):
context.observer.paths = context.subdir
newdir = context.create_random_folder(context.workdir)
newfile = context.create_random_file(context.subdir, '.txt')
context.check_event(InotifyDirModifiedEvent, context.subdir)
context.check_event(InotifyFileCreatedEvent, newfile)
context.observer.paths = [newdir, newfile]
remove(newfile)
context.check_event(InotifyFileDeletedEvent, newfile)
assert context.check_any()
context.observer.paths = context.workdir
def test_patterns(context):
context.observer.patterns = ['*.txt']
context.create_random_file(context.subdir, '.bin')
newfile = context.create_random_file(context.subdir, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any()
context.observer.patterns = None
def test_exclude(context):
context.observer.exclude = ['*.txt']
context.create_random_file(context.subdir, '.txt')
newfile = context.create_random_file(context.subdir, '.bin')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any()
context.observer.exclude = None
def test_stress(context):
newfile = []
for i in range(1024):
newfile.append(context.create_random_file(context.subdir, '.txt'))
for i in range(1024):
context.check_event(InotifyFileCreatedEvent, newfile[i])
assert context.check_any()