mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2024-11-09 17:37:17 +00:00
187 lines
6.7 KiB
Python
187 lines
6.7 KiB
Python
# pylint: disable=redefined-outer-name
|
|
|
|
from queue import Empty, Queue
|
|
from secrets import token_urlsafe
|
|
from pathlib import Path
|
|
from shutil import rmtree
|
|
from os.path import join
|
|
from os import chdir, mkdir, rename, walk, listdir
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import pytest
|
|
|
|
from inotify import InotifyObserver
|
|
from inotify import (
|
|
InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent,
|
|
InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent,
|
|
InotifyDirMovedEvent, InotifyDirDeletedEvent
|
|
)
|
|
|
|
|
|
class InotifyContext:
|
|
def __init__(self, folder, observer):
|
|
self.missing_events = 0
|
|
self.folder = folder
|
|
self.observer = observer
|
|
|
|
self.event_to_queue = {
|
|
InotifyFileCreatedEvent : self.observer.create_queue,
|
|
InotifyFileModifiedEvent : self.observer.modify_queue,
|
|
InotifyFileMovedEvent : self.observer.move_queue,
|
|
InotifyFileDeletedEvent : self.observer.delete_queue,
|
|
InotifyDirCreatedEvent : self.observer.create_queue,
|
|
InotifyDirModifiedEvent : self.observer.modify_queue,
|
|
InotifyDirMovedEvent : self.observer.move_queue,
|
|
InotifyDirDeletedEvent : self.observer.delete_queue
|
|
}
|
|
|
|
def create_random_file(self, dirname, extension):
|
|
filename = self.join(f'{dirname}/{generate_name()}{extension}')
|
|
Path(filename).touch()
|
|
return filename
|
|
|
|
def create_random_folder(self, basepath):
|
|
dirname = self.join(f'{basepath}/{generate_name()}')
|
|
mkdir(dirname)
|
|
return dirname
|
|
|
|
def join(self, path):
|
|
return join(self.folder, path)
|
|
|
|
def check_event(self, event_type, path):
|
|
self.missing_events += 1
|
|
event = self.event_to_queue[event_type].get(timeout=1)
|
|
assert isinstance(event, event_type)
|
|
assert event.src_path == path
|
|
return event
|
|
|
|
def check_empty(self, event_type):
|
|
with pytest.raises(Empty):
|
|
self.event_to_queue[event_type].get(timeout=1)
|
|
|
|
def check_any(self):
|
|
attrs = self.observer.__dict__.values()
|
|
total = sum([q.qsize() for q in attrs if isinstance(q, Queue)])
|
|
return total+self.missing_events == len(self.observer.any_list)
|
|
|
|
|
|
class InotifyTestObserver(InotifyObserver):
|
|
def __init__(self, path, patterns=None, exclude=None, recursive=False):
|
|
self.any_list = []
|
|
self.create_queue, self.modify_queue, self.move_queue, self.delete_queue = [Queue() for _ in range(4)]
|
|
super().__init__(path, patterns, exclude, recursive)
|
|
|
|
def on_any_event(self, event):
|
|
self.any_list.append(event)
|
|
|
|
def on_created(self, event):
|
|
self.create_queue.put(event)
|
|
|
|
def on_modified(self, event):
|
|
self.modify_queue.put(event)
|
|
|
|
def on_moved(self, event):
|
|
self.move_queue.put(event)
|
|
|
|
def on_deleted(self, event):
|
|
self.delete_queue.put(event)
|
|
|
|
def generate_name():
|
|
return token_urlsafe(16)
|
|
|
|
@pytest.fixture()
|
|
def context():
|
|
with TemporaryDirectory() as workdir:
|
|
chdir(workdir)
|
|
|
|
for _ in range(5):
|
|
newdir = join(workdir, generate_name())
|
|
mkdir(newdir)
|
|
Path(join(newdir, generate_name()+'.txt')).touch()
|
|
|
|
monitor = InotifyTestObserver(workdir, recursive=True)
|
|
monitor.start()
|
|
yield InotifyContext(workdir, monitor)
|
|
|
|
def test_create(context):
|
|
for _, dirs, _ in list(walk(context.folder)):
|
|
for name in dirs:
|
|
newfile = context.create_random_file(name, '.txt')
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
newdir = context.create_random_folder(name)
|
|
context.check_event(InotifyDirCreatedEvent, newdir)
|
|
assert context.check_any()
|
|
|
|
def test_modify(context):
|
|
for root, _, files in list(walk(context.folder)):
|
|
for name in files:
|
|
oldfile = join(root, name)
|
|
with open(oldfile, 'w') as ofile:
|
|
ofile.write('text')
|
|
context.check_event(InotifyFileModifiedEvent, oldfile)
|
|
rename(oldfile, oldfile+'_new')
|
|
context.check_event(InotifyDirModifiedEvent, root)
|
|
assert context.check_any()
|
|
|
|
def test_move(context):
|
|
for root, dirs, _ in list(walk(context.folder)):
|
|
for name in dirs:
|
|
olddir = join(root, name)
|
|
oldfile = join(olddir, list(listdir(olddir))[0])
|
|
rename(olddir, olddir+'_new')
|
|
context.check_event(InotifyDirMovedEvent, olddir)
|
|
context.check_event(InotifyFileMovedEvent, oldfile)
|
|
assert context.check_any()
|
|
|
|
def test_delete(context):
|
|
for root, dirs, _ in list(walk(context.folder)):
|
|
for name in dirs:
|
|
olddir = join(root, name)
|
|
oldfile = join(olddir, list(listdir(olddir))[0])
|
|
rmtree(olddir)
|
|
context.check_event(InotifyFileDeletedEvent, oldfile)
|
|
context.check_event(InotifyDirDeletedEvent, olddir)
|
|
assert context.check_any()
|
|
|
|
def test_path(context):
|
|
for _, dirs, _ in list(walk(context.folder)):
|
|
for name in dirs:
|
|
context.observer.path = context.join(name)
|
|
context.create_random_folder('.')
|
|
newfile = context.create_random_file(name, '.txt')
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
context.observer.path = context.folder
|
|
assert context.check_any()
|
|
|
|
def test_patterns(context):
|
|
for _, dirs, _ in list(walk(context.folder)):
|
|
for name in dirs:
|
|
context.observer.patterns = ["*.txt"]
|
|
context.create_random_file(name, '.bin')
|
|
newfile = context.create_random_file(name, '.txt')
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
context.check_empty(InotifyFileCreatedEvent)
|
|
assert context.check_any()
|
|
context.observer.patterns = None
|
|
|
|
def test_exclude(context):
|
|
for _, dirs, _ in list(walk(context.folder)):
|
|
for name in dirs:
|
|
context.observer.exclude = ["*.txt"]
|
|
context.create_random_file(name, '.txt')
|
|
newfile = context.create_random_file(name, '.bin')
|
|
context.check_event(InotifyFileCreatedEvent, newfile)
|
|
context.check_empty(InotifyFileCreatedEvent)
|
|
assert context.check_any()
|
|
context.observer.exclude = None
|
|
|
|
def test_stress(context):
|
|
for _, dirs, _ in list(walk(context.folder)):
|
|
for name in dirs:
|
|
newfile = []
|
|
for i in range(1024):
|
|
newfile.append(context.create_random_file(name, '.txt'))
|
|
for i in range(1024):
|
|
context.check_event(InotifyFileCreatedEvent, newfile[i])
|
|
assert context.check_any()
|