From 95673cb2c203d464e2dbb8749f0f9eec26b57965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Thu, 13 Dec 2018 22:06:13 +0100 Subject: [PATCH] Initial commit --- pipe_io_server.py | 100 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 pipe_io_server.py diff --git a/pipe_io_server.py b/pipe_io_server.py new file mode 100644 index 0000000..934811f --- /dev/null +++ b/pipe_io_server.py @@ -0,0 +1,100 @@ +from threading import Thread +from queue import Queue +from os import mkfifo, remove +from os.path import exists, join +from signal import signal, SIGTERM, SIGINT +from secrets import token_urlsafe + + +class PipeWriterThread(Thread): + def __init__(self, to_write_queue, pipe_path): + super().__init__() + self._to_write_queue = to_write_queue + self._pipe_path = pipe_path + + def run(self): + while True: + message = self._to_write_queue.get(block=True) + if message is None: + break + with open(self._pipe_path, 'wb') as pipe: + pipe.write(message) + + def stop(self): + self._to_write_queue.put(None) + self.join() + + +class PipeReaderThread(Thread): + _stop_sequence = b'stop_reading' + + def __init__(self, results_queue, pipe_path): + super().__init__() + self._results_queue = results_queue + self._pipe_path = pipe_path + + def run(self): + while True: + with open(self._pipe_path, 'rb') as pipe: + message = pipe.read() + if message == self._stop_sequence: + break + self._results_queue.put(message, block=True) + + def stop(self): + with open(self._pipe_path, 'wb') as pipe: + pipe.write(self._stop_sequence) + self.join() + + +class PipeHandler: + def __init__(self, pipe_path): + self._pipe_path = pipe_path + + def recreate(self): + self.remove() + mkfifo(self._pipe_path) + + def remove(self): + if exists(self._pipe_path): + remove(self._pipe_path) + + +class PipeIOServer: + def __init__(self, in_pipe=None, out_pipe=None): + self.in_pipe, self.out_pipe = in_pipe, out_pipe + self._create_pipes() + + self._message_queue = Queue() + self._io_threads = { + 'reader': PipeReaderThread(self._message_queue, self.in_pipe), + 'writer': PipeWriterThread(self._message_queue, self.out_pipe) + } + + def _create_pipes(self): + if not self.in_pipe or not self.out_pipe: + pipe_id = token_urlsafe(6) + self.in_pipe = join('/tmp', f'in_pipe_{pipe_id}') + self.out_pipe = join('/tmp', f'out_pipe_{pipe_id}') + PipeHandler(self.in_pipe).recreate() + PipeHandler(self.out_pipe).recreate() + + def run(self): + for thread in self._io_threads.values(): + thread.start() + + def stop(self): + for thread in self._io_threads.values(): + thread.stop() + PipeHandler(self.in_pipe).remove() + PipeHandler(self.out_pipe).remove() + + +if __name__ == "__main__": + pipe_io = PipeIOServer() + signal(SIGTERM, lambda a, b: pipe_io.stop()) + signal(SIGINT, lambda a, b: pipe_io.stop()) + pipe_io.run() + print('Running pipe IO server with named pipes:') + print(f'Input: {pipe_io.in_pipe}') + print(f'Output: {pipe_io.out_pipe}')