diff --git a/pipe_io_server/pipe.py b/pipe_io_server/pipe.py new file mode 100644 index 0000000..b1b2578 --- /dev/null +++ b/pipe_io_server/pipe.py @@ -0,0 +1,26 @@ +from os import mkfifo, remove +from os.path import exists + + +class Pipe: + def __init__(self, path): + self.path = path + + def recreate(self): + self.remove() + self.create() + + def remove(self): + if exists(self.path): + remove(self.path) + + def create(self): + mkfifo(self.path) + + def write(self, data): + with open(self.path, 'wb') as pipe: + pipe.write(data) + + def read(self): + with open(self.path, 'rb') as pipe: + return pipe.read() diff --git a/pipe_io_server/pipe_io_server.py b/pipe_io_server/pipe_io_server.py index f220513..bba85ae 100644 --- a/pipe_io_server/pipe_io_server.py +++ b/pipe_io_server/pipe_io_server.py @@ -1,26 +1,11 @@ -from os import mkfifo, remove -from os.path import exists, join +from os.path import join from secrets import token_urlsafe from collections import namedtuple from abc import ABC, abstractmethod from .pipe_reader_thread import PipeReaderThread from .pipe_writer_thread import PipeWriterThread - - -class PipeHandler: - def __init__(self, *pipe_paths): - self._pipe_paths = pipe_paths - - def recreate(self): - self.remove() - for pipe_path in self._pipe_paths: - mkfifo(pipe_path) - - def remove(self): - for pipe_path in self._pipe_paths: - if exists(pipe_path): - remove(pipe_path) +from .pipe import Pipe class PipeIOServer(ABC): @@ -35,7 +20,8 @@ class PipeIOServer(ABC): pipe_id = token_urlsafe(6) self.in_pipe = join('/tmp', f'in_pipe_{pipe_id}') self.out_pipe = join('/tmp', f'out_pipe_{pipe_id}') - PipeHandler(self.in_pipe, self.out_pipe).recreate() + Pipe(self.in_pipe).recreate() + Pipe(self.out_pipe).recreate() def _init_io_threads(self): io_threads_dict = { @@ -60,4 +46,5 @@ class PipeIOServer(ABC): for thread in self._io_threads: if thread.isAlive(): thread.stop() - PipeHandler(self.in_pipe, self.out_pipe).remove() + Pipe(self.in_pipe).remove() + Pipe(self.out_pipe).remove() diff --git a/pipe_io_server/pipe_reader_thread.py b/pipe_io_server/pipe_reader_thread.py index 8d2c32c..f5bdcf6 100644 --- a/pipe_io_server/pipe_reader_thread.py +++ b/pipe_io_server/pipe_reader_thread.py @@ -1,6 +1,7 @@ from threading import Thread from .terminate_process_on_failure import terminate_process_on_failure +from .pipe import Pipe class PipeReaderThread(Thread): @@ -14,13 +15,11 @@ class PipeReaderThread(Thread): @terminate_process_on_failure def run(self): while True: - with open(self._pipe_path, 'rb') as pipe: - message = pipe.read() + message = Pipe(self._pipe_path).read() if message == self._stop_sequence: break self._message_handler(message) def stop(self): - with open(self._pipe_path, 'wb') as pipe: - pipe.write(self._stop_sequence) + Pipe(self._pipe_path).write(self._stop_sequence) self.join() diff --git a/pipe_io_server/pipe_writer_thread.py b/pipe_io_server/pipe_writer_thread.py index 7d9bbe8..3fa96d0 100644 --- a/pipe_io_server/pipe_writer_thread.py +++ b/pipe_io_server/pipe_writer_thread.py @@ -2,6 +2,7 @@ from threading import Thread from queue import Queue from .terminate_process_on_failure import terminate_process_on_failure +from .pipe import Pipe class PipeWriterThread(Thread): @@ -19,8 +20,7 @@ class PipeWriterThread(Thread): message = self._write_queue.get(block=True) if message is None: break - with open(self._pipe_path, 'wb') as pipe: - pipe.write(message) + Pipe(self._pipe_path).write(message) def stop(self): self._write_queue.put(None)