baseimage-tutorial-framework/lib/tfw/components/inotify/test_inotify.py
2019-06-06 13:41:13 +02:00

187 lines
6.6 KiB
Python

# pylint: disable=redefined-outer-name
from queue import Empty, Queue
from secrets import token_urlsafe
from pathlib import Path
from shutil import rmtree
from os.path import join
from os import chdir, mkdir, rename, walk, listdir
from tempfile import TemporaryDirectory
import pytest
from inotify import InotifyObserver
from inotify import (
InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent,
InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent,
InotifyDirMovedEvent, InotifyDirDeletedEvent
)
class InotifyContext:
def __init__(self, folder, observer):
self.missing_events = 0
self.folder = folder
self.observer = observer
self.event_to_queue = {
InotifyFileCreatedEvent : self.observer.create_queue,
InotifyFileModifiedEvent : self.observer.modify_queue,
InotifyFileMovedEvent : self.observer.move_queue,
InotifyFileDeletedEvent : self.observer.delete_queue,
InotifyDirCreatedEvent : self.observer.create_queue,
InotifyDirModifiedEvent : self.observer.modify_queue,
InotifyDirMovedEvent : self.observer.move_queue,
InotifyDirDeletedEvent : self.observer.delete_queue
}
def create_random_file(self, dirname, extension):
filename = self.join(f'{dirname}/{generate_name()}{extension}')
Path(filename).touch()
return filename
def create_random_folder(self, basepath):
dirname = self.join(f'{basepath}/{generate_name()}')
mkdir(dirname)
return dirname
def join(self, path):
return join(self.folder, path)
def check_event(self, event_type, path):
self.missing_events += 1
event = self.event_to_queue[event_type].get(timeout=1)
assert isinstance(event, event_type)
assert event.src_path == path
return event
def check_empty(self, event_type):
with pytest.raises(Empty):
self.event_to_queue[event_type].get(timeout=1)
def check_any(self):
attrs = self.observer.__dict__.values()
total = sum([q.qsize() for q in attrs if isinstance(q, Queue)])
return total+self.missing_events == len(self.observer.any_list)
class InotifyTestObserver(InotifyObserver):
def __init__(self, path, patterns=None, exclude=None):
self.any_list = []
self.create_queue, self.modify_queue, self.move_queue, self.delete_queue = [Queue() for _ in range(4)]
super().__init__(path, patterns, exclude)
def on_any_event(self, event):
self.any_list.append(event)
def on_created(self, event):
self.create_queue.put(event)
def on_modified(self, event):
self.modify_queue.put(event)
def on_moved(self, event):
self.move_queue.put(event)
def on_deleted(self, event):
self.delete_queue.put(event)
def generate_name():
return token_urlsafe(16)
@pytest.fixture()
def context():
with TemporaryDirectory() as workdir:
chdir(workdir)
for _ in range(5):
newdir = join(workdir, generate_name())
mkdir(newdir)
Path(join(newdir, generate_name()+'.txt')).touch()
monitor = InotifyTestObserver(workdir)
monitor.start()
yield InotifyContext(workdir, monitor)
def test_create(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
newfile = context.create_random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
newdir = context.create_random_folder(name)
context.check_event(InotifyDirCreatedEvent, newdir)
assert context.check_any()
def test_modify(context):
for root, _, files in list(walk(context.folder)):
for name in files:
oldfile = join(root, name)
with open(oldfile, 'w') as ofile:
ofile.write('text')
context.check_event(InotifyFileModifiedEvent, oldfile)
rename(oldfile, oldfile+'_new')
context.check_event(InotifyDirModifiedEvent, root)
assert context.check_any()
def test_move(context):
for root, dirs, _ in list(walk(context.folder)):
for name in dirs:
olddir = join(root, name)
oldfile = join(olddir, list(listdir(olddir))[0])
rename(olddir, olddir+'_new')
context.check_event(InotifyDirMovedEvent, olddir)
context.check_event(InotifyFileMovedEvent, oldfile)
assert context.check_any()
def test_delete(context):
for root, dirs, _ in list(walk(context.folder)):
for name in dirs:
olddir = join(root, name)
oldfile = join(olddir, list(listdir(olddir))[0])
rmtree(olddir)
context.check_event(InotifyFileDeletedEvent, oldfile)
context.check_event(InotifyDirDeletedEvent, olddir)
assert context.check_any()
def test_path(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.path = context.join(name)
context.create_random_folder('.')
newfile = context.create_random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
context.observer.path = context.folder
assert context.check_any()
def test_patterns(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.patterns = ["*.txt"]
context.create_random_file(name, '.bin')
newfile = context.create_random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any()
context.observer.patterns = None
def test_exclude(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.exclude = ["*.txt"]
context.create_random_file(name, '.txt')
newfile = context.create_random_file(name, '.bin')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any()
context.observer.exclude = None
def test_stress(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
newfile = []
for i in range(1024):
newfile.append(context.create_random_file(name, '.txt'))
for i in range(1024):
context.check_event(InotifyFileCreatedEvent, newfile[i])
assert context.check_any()