Merge pull request #56 from avatao-content/inotify

Create inotify module with unit tests
This commit is contained in:
therealkrispet 2019-06-11 13:03:52 +02:00 committed by GitHub
commit 8a5c7b2a49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 361 additions and 0 deletions

View File

@ -0,0 +1,6 @@
from .inotify import InotifyObserver
from .inotify import (
InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent,
InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent,
InotifyDirMovedEvent, InotifyDirDeletedEvent
)

View File

@ -0,0 +1,169 @@
# pylint: disable=too-few-public-methods
from time import time
from watchdog.observers import Observer
from watchdog.events import FileSystemMovedEvent, PatternMatchingEventHandler
from watchdog.events import (
FileCreatedEvent, FileModifiedEvent, FileMovedEvent, FileDeletedEvent,
DirCreatedEvent, DirModifiedEvent, DirMovedEvent, DirDeletedEvent
)
class InotifyEvent:
def __init__(self, src_path):
self.date = time()
self.src_path = src_path
def __str__(self):
return self.__repr__()
def __repr__(self):
return f'{self.date}\t{self.__class__.__name__}:\t{self.src_path}'
class InotifyMovedEvent(InotifyEvent):
def __init__(self, src_path, dest_path):
self.dest_path = dest_path
super().__init__(src_path)
def __repr__(self):
return super().__repr__()+f'\t-> {self.dest_path}'
class InotifyFileCreatedEvent(InotifyEvent):
pass
class InotifyFileModifiedEvent(InotifyEvent):
pass
class InotifyFileMovedEvent(InotifyMovedEvent):
pass
class InotifyFileDeletedEvent(InotifyEvent):
pass
class InotifyDirCreatedEvent(InotifyEvent):
pass
class InotifyDirModifiedEvent(InotifyEvent):
pass
class InotifyDirMovedEvent(InotifyMovedEvent):
pass
class InotifyDirDeletedEvent(InotifyEvent):
pass
class InotifyObserver:
def __init__(self, path, patterns=None, exclude=None, recursive=False):
self._path = path
self._patterns = patterns
self._exclude = exclude
self._recursive = recursive
self._observer = Observer()
self._reset(path, patterns, exclude)
def _reset(self, path, patterns, exclude):
dispatch_event = self.dispatch_event
class TransformerEventHandler(PatternMatchingEventHandler):
def on_any_event(self, event):
dispatch_event(event)
self.handler = TransformerEventHandler(patterns, exclude)
self._observer.unschedule_all()
self._observer.schedule(self.handler, path, self._recursive)
@property
def path(self):
return self._path
@path.setter
def path(self, path):
self._path = path
self._reset(path, self._patterns, self._exclude)
@property
def patterns(self):
return self._patterns
@patterns.setter
def patterns(self, patterns):
self._patterns = patterns
self._reset(self._path, patterns, self._exclude)
@property
def exclude(self):
return self._exclude
@exclude.setter
def exclude(self, exclude):
self._exclude = exclude
self._reset(self._path, self._patterns, exclude)
def start(self):
self._observer.start()
def stop(self):
self._observer.stop()
self._observer.join()
def dispatch_event(self, event):
event_to_action = {
InotifyFileCreatedEvent : self.on_created,
InotifyFileModifiedEvent : self.on_modified,
InotifyFileMovedEvent : self.on_moved,
InotifyFileDeletedEvent : self.on_deleted,
InotifyDirCreatedEvent : self.on_created,
InotifyDirModifiedEvent : self.on_modified,
InotifyDirMovedEvent : self.on_moved,
InotifyDirDeletedEvent : self.on_deleted
}
event = self.transform_event(event)
self.on_any_event(event)
event_to_action[type(event)](event)
@staticmethod
def transform_event(event):
watchdog_to_inotify = {
FileCreatedEvent : InotifyFileCreatedEvent,
FileModifiedEvent : InotifyFileModifiedEvent,
FileMovedEvent : InotifyFileMovedEvent,
FileDeletedEvent : InotifyFileDeletedEvent,
DirCreatedEvent : InotifyDirCreatedEvent,
DirModifiedEvent : InotifyDirModifiedEvent,
DirMovedEvent : InotifyDirMovedEvent,
DirDeletedEvent : InotifyDirDeletedEvent
}
try:
cls = watchdog_to_inotify[type(event)]
except KeyError:
raise NameError('Watchdog API returned an unknown event.')
if isinstance(event, FileSystemMovedEvent):
return cls(event.src_path, event.dest_path)
return cls(event.src_path)
def on_any_event(self, event):
pass
def on_created(self, event):
pass
def on_modified(self, event):
pass
def on_moved(self, event):
pass
def on_deleted(self, event):
pass

View File

@ -0,0 +1,186 @@
# pylint: disable=redefined-outer-name
from queue import Empty, Queue
from secrets import token_urlsafe
from pathlib import Path
from shutil import rmtree
from os.path import join
from os import chdir, mkdir, rename, walk, listdir
from tempfile import TemporaryDirectory
import pytest
from inotify import InotifyObserver
from inotify import (
InotifyFileCreatedEvent, InotifyFileModifiedEvent, InotifyFileMovedEvent,
InotifyFileDeletedEvent, InotifyDirCreatedEvent, InotifyDirModifiedEvent,
InotifyDirMovedEvent, InotifyDirDeletedEvent
)
class InotifyContext:
def __init__(self, folder, observer):
self.missing_events = 0
self.folder = folder
self.observer = observer
self.event_to_queue = {
InotifyFileCreatedEvent : self.observer.create_queue,
InotifyFileModifiedEvent : self.observer.modify_queue,
InotifyFileMovedEvent : self.observer.move_queue,
InotifyFileDeletedEvent : self.observer.delete_queue,
InotifyDirCreatedEvent : self.observer.create_queue,
InotifyDirModifiedEvent : self.observer.modify_queue,
InotifyDirMovedEvent : self.observer.move_queue,
InotifyDirDeletedEvent : self.observer.delete_queue
}
def create_random_file(self, dirname, extension):
filename = self.join(f'{dirname}/{generate_name()}{extension}')
Path(filename).touch()
return filename
def create_random_folder(self, basepath):
dirname = self.join(f'{basepath}/{generate_name()}')
mkdir(dirname)
return dirname
def join(self, path):
return join(self.folder, path)
def check_event(self, event_type, path):
self.missing_events += 1
event = self.event_to_queue[event_type].get(timeout=1)
assert isinstance(event, event_type)
assert event.src_path == path
return event
def check_empty(self, event_type):
with pytest.raises(Empty):
self.event_to_queue[event_type].get(timeout=1)
def check_any(self):
attrs = self.observer.__dict__.values()
total = sum([q.qsize() for q in attrs if isinstance(q, Queue)])
return total+self.missing_events == len(self.observer.any_list)
class InotifyTestObserver(InotifyObserver):
def __init__(self, path, patterns=None, exclude=None, recursive=False):
self.any_list = []
self.create_queue, self.modify_queue, self.move_queue, self.delete_queue = [Queue() for _ in range(4)]
super().__init__(path, patterns, exclude, recursive)
def on_any_event(self, event):
self.any_list.append(event)
def on_created(self, event):
self.create_queue.put(event)
def on_modified(self, event):
self.modify_queue.put(event)
def on_moved(self, event):
self.move_queue.put(event)
def on_deleted(self, event):
self.delete_queue.put(event)
def generate_name():
return token_urlsafe(16)
@pytest.fixture()
def context():
with TemporaryDirectory() as workdir:
chdir(workdir)
for _ in range(5):
newdir = join(workdir, generate_name())
mkdir(newdir)
Path(join(newdir, generate_name()+'.txt')).touch()
monitor = InotifyTestObserver(workdir, recursive=True)
monitor.start()
yield InotifyContext(workdir, monitor)
def test_create(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
newfile = context.create_random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
newdir = context.create_random_folder(name)
context.check_event(InotifyDirCreatedEvent, newdir)
assert context.check_any()
def test_modify(context):
for root, _, files in list(walk(context.folder)):
for name in files:
oldfile = join(root, name)
with open(oldfile, 'w') as ofile:
ofile.write('text')
context.check_event(InotifyFileModifiedEvent, oldfile)
rename(oldfile, oldfile+'_new')
context.check_event(InotifyDirModifiedEvent, root)
assert context.check_any()
def test_move(context):
for root, dirs, _ in list(walk(context.folder)):
for name in dirs:
olddir = join(root, name)
oldfile = join(olddir, list(listdir(olddir))[0])
rename(olddir, olddir+'_new')
context.check_event(InotifyDirMovedEvent, olddir)
context.check_event(InotifyFileMovedEvent, oldfile)
assert context.check_any()
def test_delete(context):
for root, dirs, _ in list(walk(context.folder)):
for name in dirs:
olddir = join(root, name)
oldfile = join(olddir, list(listdir(olddir))[0])
rmtree(olddir)
context.check_event(InotifyFileDeletedEvent, oldfile)
context.check_event(InotifyDirDeletedEvent, olddir)
assert context.check_any()
def test_path(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.path = context.join(name)
context.create_random_folder('.')
newfile = context.create_random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
context.observer.path = context.folder
assert context.check_any()
def test_patterns(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.patterns = ["*.txt"]
context.create_random_file(name, '.bin')
newfile = context.create_random_file(name, '.txt')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any()
context.observer.patterns = None
def test_exclude(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
context.observer.exclude = ["*.txt"]
context.create_random_file(name, '.txt')
newfile = context.create_random_file(name, '.bin')
context.check_event(InotifyFileCreatedEvent, newfile)
context.check_empty(InotifyFileCreatedEvent)
assert context.check_any()
context.observer.exclude = None
def test_stress(context):
for _, dirs, _ in list(walk(context.folder)):
for name in dirs:
newfile = []
for i in range(1024):
newfile.append(context.create_random_file(name, '.txt'))
for i in range(1024):
context.check_event(InotifyFileCreatedEvent, newfile[i])
assert context.check_any()