From 86662b14d8ba38eab48a582e49ed7f5750da7af6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Fri, 14 Dec 2018 17:36:53 +0100 Subject: [PATCH] Refactor PipeIOServer implementation into several files --- pipe_io_server/__init__.py | 1 + .../pipe_io_server.py | 55 +------------------ pipe_io_server/pipe_reader_thread.py | 30 ++++++++++ pipe_io_server/pipe_writer_thread.py | 24 ++++++++ 4 files changed, 58 insertions(+), 52 deletions(-) create mode 100644 pipe_io_server/__init__.py rename pipe_io_server.py => pipe_io_server/pipe_io_server.py (53%) create mode 100644 pipe_io_server/pipe_reader_thread.py create mode 100644 pipe_io_server/pipe_writer_thread.py diff --git a/pipe_io_server/__init__.py b/pipe_io_server/__init__.py new file mode 100644 index 0000000..234d387 --- /dev/null +++ b/pipe_io_server/__init__.py @@ -0,0 +1 @@ +from .pipe_io_server import PipeIOServer diff --git a/pipe_io_server.py b/pipe_io_server/pipe_io_server.py similarity index 53% rename from pipe_io_server.py rename to pipe_io_server/pipe_io_server.py index ebe3ba4..b68e4f8 100644 --- a/pipe_io_server.py +++ b/pipe_io_server/pipe_io_server.py @@ -1,60 +1,11 @@ -from threading import Thread -from queue import Queue -from os import mkfifo, remove, getpid, kill +from os import mkfifo, remove from os.path import exists, join -from signal import SIGTERM from secrets import token_urlsafe from collections import namedtuple from abc import ABC, abstractmethod -from traceback import print_exc - -class PipeWriterThread(Thread): - def __init__(self, pipe_path): - super().__init__() - self._pipe_path = pipe_path - self._write_queue = Queue() - - def write(self, message): - self._write_queue.put(message, block=True) - - def run(self): - while True: - message = self._write_queue.get(block=True) - if message is None: - break - with open(self._pipe_path, 'wb') as pipe: - pipe.write(message) - - def stop(self): - self._write_queue.put(None) - self.join() - - -class PipeReaderThread(Thread): - _stop_sequence = b'stop_reading' - - def __init__(self, pipe_path, message_handler): - super().__init__() - self._message_handler = message_handler - self._pipe_path = pipe_path - - def run(self): - while True: - with open(self._pipe_path, 'rb') as pipe: - message = pipe.read() - if message == self._stop_sequence: - break - try: - self._message_handler(message) - except: # pylint: disable=bare-except - print_exc() - kill(getpid(), SIGTERM) - - def stop(self): - with open(self._pipe_path, 'wb') as pipe: - pipe.write(self._stop_sequence) - self.join() +from .pipe_reader_thread import PipeReaderThread +from .pipe_writer_thread import PipeWriterThread class PipeHandler: diff --git a/pipe_io_server/pipe_reader_thread.py b/pipe_io_server/pipe_reader_thread.py new file mode 100644 index 0000000..c9488fd --- /dev/null +++ b/pipe_io_server/pipe_reader_thread.py @@ -0,0 +1,30 @@ +from threading import Thread +from os import kill, getpid +from signal import SIGTERM +from traceback import print_exc + + +class PipeReaderThread(Thread): + _stop_sequence = b'stop_reading' + + def __init__(self, pipe_path, message_handler): + super().__init__() + self._message_handler = message_handler + self._pipe_path = pipe_path + + def run(self): + while True: + with open(self._pipe_path, 'rb') as pipe: + message = pipe.read() + if message == self._stop_sequence: + break + try: + self._message_handler(message) + except: # pylint: disable=bare-except + print_exc() + kill(getpid(), SIGTERM) + + def stop(self): + with open(self._pipe_path, 'wb') as pipe: + pipe.write(self._stop_sequence) + self.join() diff --git a/pipe_io_server/pipe_writer_thread.py b/pipe_io_server/pipe_writer_thread.py new file mode 100644 index 0000000..39a8161 --- /dev/null +++ b/pipe_io_server/pipe_writer_thread.py @@ -0,0 +1,24 @@ +from threading import Thread +from queue import Queue + + +class PipeWriterThread(Thread): + def __init__(self, pipe_path): + super().__init__() + self._pipe_path = pipe_path + self._write_queue = Queue() + + def write(self, message): + self._write_queue.put(message, block=True) + + def run(self): + while True: + message = self._write_queue.get(block=True) + if message is None: + break + with open(self._pipe_path, 'wb') as pipe: + pipe.write(message) + + def stop(self): + self._write_queue.put(None) + self.join()