mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-15 08:37:17 +00:00
180 lines
6.2 KiB
Python
180 lines
6.2 KiB
Python
# pylint: disable=redefined-outer-name
|
|
|
|
from queue import Empty, Queue
|
|
from secrets import token_urlsafe
|
|
from pathlib import Path
|
|
from shutil import rmtree
|
|
from os.path import join
|
|
from os import mkdir, remove, rename
|
|
from tempfile import TemporaryDirectory
|
|
from contextlib import suppress
|
|
|
|
import watchdog
|
|
import pytest
|
|
|
|
from .inotify import InotifyObserver
|
|
from .inotify import (
|
|
InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent,
|
|
InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent,
|
|
InotifyDirMovedEvent, InotifyDirDeletedEvent
|
|
)
|
|
|
|
with suppress(AttributeError):
|
|
watchdog.observers.inotify_buffer.InotifyBuffer.delay = 0
|
|
|
|
|
|
class InotifyContext:
|
|
def __init__(self, workdir, subdir, subfile, observer):
|
|
self.missing_events = 0
|
|
self.workdir = workdir
|
|
self.subdir = subdir
|
|
self.subfile = subfile
|
|
self.observer = observer
|
|
|
|
self.event_to_queue = {
|
|
InotifyFileCreatedEvent : self.observer.create_queue,
|
|
InotifyFileModifiedEvent : self.observer.modify_queue,
|
|
InotifyFileMovedEvent : self.observer.move_queue,
|
|
InotifyFileDeletedEvent : self.observer.delete_queue,
|
|
InotifyDirCreatedEvent : self.observer.create_queue,
|
|
InotifyDirModifiedEvent : self.observer.modify_queue,
|
|
InotifyDirMovedEvent : self.observer.move_queue,
|
|
InotifyDirDeletedEvent : self.observer.delete_queue
|
|
}
|
|
|
|
def create_random_file(self, dirname, extension):
|
|
filename = self.join(f'{dirname}/{generate_name()}{extension}')
|
|
Path(filename).touch()
|
|
return filename
|
|
|
|
def create_random_folder(self, basepath):
|
|
dirname = self.join(f'{basepath}/{generate_name()}')
|
|
mkdir(dirname)
|
|
return dirname
|
|
|
|
def join(self, path):
|
|
return join(self.workdir, path)
|
|
|
|
def check_event(self, event_type, path):
|
|
self.missing_events += 1
|
|
event = self.event_to_queue[event_type].get(timeout=0.1)
|
|
assert isinstance(event, event_type)
|
|
assert event.src_path == path
|
|
return event
|
|
|
|
def check_empty(self, event_type):
|
|
with pytest.raises(Empty):
|
|
self.event_to_queue[event_type].get(timeout=0.1)
|
|
|
|
def check_any(self):
|
|
attrs = self.observer.__dict__.values()
|
|
total = sum([q.qsize() for q in attrs if isinstance(q, Queue)])
|
|
return total+self.missing_events == len(self.observer.any_list)
|
|
|
|
|
|
class InotifyTestObserver(InotifyObserver):
|
|
def __init__(self, paths, patterns=None, exclude=None, recursive=False):
|
|
self.any_list = []
|
|
self.create_queue, self.modify_queue, self.move_queue, self.delete_queue = [Queue() for _ in range(4)]
|
|
super().__init__(paths, patterns, exclude, recursive)
|
|
|
|
def on_any_event(self, event):
|
|
self.any_list.append(event)
|
|
|
|
def on_created(self, event):
|
|
self.create_queue.put(event)
|
|
|
|
def on_modified(self, event):
|
|
self.modify_queue.put(event)
|
|
|
|
def on_moved(self, event):
|
|
self.move_queue.put(event)
|
|
|
|
def on_deleted(self, event):
|
|
self.delete_queue.put(event)
|
|
|
|
def generate_name():
|
|
return token_urlsafe(16)
|
|
|
|
@pytest.fixture()
|
|
def context():
|
|
with TemporaryDirectory() as workdir:
|
|
subdir = join(workdir, generate_name())
|
|
subfile = join(subdir, generate_name()+'.txt')
|
|
mkdir(subdir)
|
|
Path(subfile).touch()
|
|
monitor = InotifyTestObserver(workdir, recursive=True)
|
|
monitor.start()
|
|
yield InotifyContext(workdir, subdir, subfile, monitor)
|
|
|
|
def test_create(context):
|
|
newfile = context.create_random_file(context.workdir, '.txt')
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
newdir = context.create_random_folder(context.workdir)
|
|
context.check_event(InotifyDirCreatedEvent, newdir)
|
|
assert context.check_any()
|
|
|
|
def test_modify(context):
|
|
with open(context.subfile, 'wb', buffering=0) as ofile:
|
|
ofile.write(b'text')
|
|
context.check_event(InotifyFileModifiedEvent, context.subfile)
|
|
while True:
|
|
try:
|
|
context.observer.modify_queue.get(timeout=0.1)
|
|
context.missing_events += 1
|
|
except Empty:
|
|
break
|
|
rename(context.subfile, context.subfile+'_new')
|
|
context.check_event(InotifyDirModifiedEvent, context.subdir)
|
|
assert context.check_any()
|
|
|
|
def test_move(context):
|
|
rename(context.subdir, context.subdir+'_new')
|
|
context.check_event(InotifyDirMovedEvent, context.subdir)
|
|
context.check_event(InotifyFileMovedEvent, context.subfile)
|
|
assert context.check_any()
|
|
|
|
def test_delete(context):
|
|
rmtree(context.subdir)
|
|
context.check_event(InotifyFileDeletedEvent, context.subfile)
|
|
context.check_event(InotifyDirDeletedEvent, context.subdir)
|
|
assert context.check_any()
|
|
|
|
def test_paths(context):
|
|
context.observer.paths = context.subdir
|
|
newdir = context.create_random_folder(context.workdir)
|
|
newfile = context.create_random_file(context.subdir, '.txt')
|
|
context.check_event(InotifyDirModifiedEvent, context.subdir)
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
context.observer.paths = [newdir, newfile]
|
|
remove(newfile)
|
|
context.check_event(InotifyFileDeletedEvent, newfile)
|
|
assert context.check_any()
|
|
context.observer.paths = context.workdir
|
|
|
|
def test_patterns(context):
|
|
context.observer.patterns = ['*.txt']
|
|
context.create_random_file(context.subdir, '.bin')
|
|
newfile = context.create_random_file(context.subdir, '.txt')
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
context.check_empty(InotifyFileCreatedEvent)
|
|
assert context.check_any()
|
|
context.observer.patterns = None
|
|
|
|
def test_exclude(context):
|
|
context.observer.exclude = ['*.txt']
|
|
context.create_random_file(context.subdir, '.txt')
|
|
newfile = context.create_random_file(context.subdir, '.bin')
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
context.check_empty(InotifyFileCreatedEvent)
|
|
assert context.check_any()
|
|
context.observer.exclude = None
|
|
|
|
def test_stress(context):
|
|
newfile = []
|
|
for i in range(1024):
|
|
newfile.append(context.create_random_file(context.subdir, '.txt'))
|
|
for i in range(1024):
|
|
context.check_event(InotifyFileCreatedEvent, newfile[i])
|
|
assert context.check_any()
|