baseimage-tutorial-framework/lib/tfw/components/inotify/test_inotify.py

186 lines
6.5 KiB
Python
Raw Normal View History

2019-05-31 07:36:19 +00:00
# pylint: disable=redefined-outer-name
from queue import Empty, Queue
from secrets import token_urlsafe as random_name
from pathlib import Path
from shutil import rmtree
from os.path import join
from os import chdir, mkdir, rename, walk, listdir
from tempfile import TemporaryDirectory
import pytest
from inotify import InotifyObserver
from inotify import (
InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent,
InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent,
InotifyDirMovedEvent, InotifyDirDeletedEvent
)
class InotifyContext:
def __init__(self, folder, observer):
self.missing = 0
self.folder = folder
self.observer = observer
self.event_to_queue = {
InotifyFileCreatedEvent : self.observer.create_queue,
InotifyFileModifiedEvent : self.observer.modify_queue,
InotifyFileMovedEvent : self.observer.move_queue,
InotifyFileDeletedEvent : self.observer.delete_queue,
InotifyDirCreatedEvent : self.observer.create_queue,
InotifyDirModifiedEvent : self.observer.modify_queue,
InotifyDirMovedEvent : self.observer.move_queue,
InotifyDirDeletedEvent : self.observer.delete_queue
}
def random_file(self, dirname, extension):
filename = self.join(f'{dirname}/{random_name(16)}{extension}')
Path(filename).touch()
return filename
def random_folder(self, basepath):
dirname = self.join(f'{basepath}/{random_name(16)}')
mkdir(dirname)
return dirname
def join(self, path):
return join(self.folder, path)
def check_event(self, event_type, path):
event = self.event_to_queue[event_type].get(timeout=1)
assert isinstance(event, event_type)
assert event.src_path == path
return event
def check_empty(self, event_type):
with pytest.raises(Empty):
self.event_to_queue[event_type].get(timeout=1)
def check_any(self, missing):
self.missing += missing
attrs = self.observer.__dict__.values()
total = sum([q.qsize() for q in attrs if isinstance(q, Queue)])
return total+self.missing == len(self.observer.any_list)
class InotifyTestObserver(InotifyObserver):
def __init__(self, path, patterns=None, exclude=None):
self.any_list = []
(self.create_queue, self.modify_queue, self.move_queue,
self.delete_queue) = [Queue() for _ in range(4)]
super().__init__(path, patterns, exclude)
def on_any_event(self, event):
self.any_list.append(event)
def on_created(self, event):
self.create_queue.put(event)
def on_modified(self, event):
self.modify_queue.put(event)
def on_moved(self, event):
self.move_queue.put(event)
def on_deleted(self, event):
self.delete_queue.put(event)
@pytest.fixture()
def context():
with TemporaryDirectory() as workdir:
chdir(workdir)
for _ in range(5):
newdir = join(workdir, random_name(16))
mkdir(newdir)
Path(join(newdir, random_name(16)+'.txt')).touch()
monitor = InotifyTestObserver(workdir)
monitor.start()
yield InotifyContext(workdir, monitor)
def test_create(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
newfile = context.random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
newdir = context.random_folder(name)
context.check_event(InotifyDirCreatedEvent, newdir)
assert context.check_any(2)
def test_modify(context):
for root, _, files in list(walk(context.folder)):
for name in files:
oldfile = join(root, name)
with open(oldfile, 'w') as ofile:
ofile.write('text')
context.check_event(InotifyFileModifiedEvent, oldfile)
rename(oldfile, oldfile+'_new')
context.check_event(InotifyDirModifiedEvent, root)
assert context.check_any(2)
def test_move(context):
for root, dirs, _ in list(walk(context.folder)):
for name in dirs:
olddir = join(root, name)
oldfile = join(olddir, list(listdir(olddir))[0])
rename(olddir, olddir+'_new')
context.check_event(InotifyDirMovedEvent, olddir)
context.check_event(InotifyFileMovedEvent, oldfile)
assert context.check_any(2)
def test_delete(context):
for root, dirs, _ in list(walk(context.folder)):
for name in dirs:
olddir = join(root, name)
oldfile = join(olddir, list(listdir(olddir))[0])
rmtree(olddir)
context.check_event(InotifyFileDeletedEvent, oldfile)
context.check_event(InotifyDirDeletedEvent, olddir)
assert context.check_any(2)
def test_path(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.path = context.join(name)
context.random_folder('.')
newfile = context.random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
context.observer.path = context.folder
assert context.check_any(1)
def test_patterns(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.patterns = ["*.txt"]
context.random_file(name, '.bin')
newfile = context.random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any(1)
context.observer.patterns = None
def test_exclude(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.exclude = ["*.txt"]
context.random_file(name, '.txt')
newfile = context.random_file(name, '.bin')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any(1)
context.observer.exclude = None
def test_stress(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
newfile = []
for i in range(1024):
newfile.append(context.random_file(name, '.txt'))
for i in range(1024):
context.check_event(InotifyFileCreatedEvent, newfile[i])
assert context.check_any(1024)