from threading import Thread from queue import Queue from os import mkfifo, remove from os.path import exists, join from signal import signal, SIGTERM, SIGINT from secrets import token_urlsafe class PipeWriterThread(Thread): def __init__(self, to_write_queue, pipe_path): super().__init__() self._to_write_queue = to_write_queue self._pipe_path = pipe_path def run(self): while True: message = self._to_write_queue.get(block=True) if message is None: break with open(self._pipe_path, 'wb') as pipe: pipe.write(message) def stop(self): self._to_write_queue.put(None) self.join() class PipeReaderThread(Thread): _stop_sequence = b'stop_reading' def __init__(self, results_queue, pipe_path): super().__init__() self._results_queue = results_queue self._pipe_path = pipe_path def run(self): while True: with open(self._pipe_path, 'rb') as pipe: message = pipe.read() if message == self._stop_sequence: break self._results_queue.put(message, block=True) def stop(self): with open(self._pipe_path, 'wb') as pipe: pipe.write(self._stop_sequence) self.join() class PipeHandler: def __init__(self, *pipe_paths): self._pipe_paths = pipe_paths def recreate(self): self.remove() for pipe_path in self._pipe_paths: mkfifo(pipe_path) def remove(self): for pipe_path in self._pipe_paths: if exists(pipe_path): remove(pipe_path) class PipeIOServer: def __init__(self, in_pipe=None, out_pipe=None): self.in_pipe, self.out_pipe = in_pipe, out_pipe self._create_pipes() self._message_queue = Queue() self._io_threads = { 'reader': PipeReaderThread(self._message_queue, self.in_pipe), 'writer': PipeWriterThread(self._message_queue, self.out_pipe) } def _create_pipes(self): if not self.in_pipe or not self.out_pipe: pipe_id = token_urlsafe(6) self.in_pipe = join('/tmp', f'in_pipe_{pipe_id}') self.out_pipe = join('/tmp', f'out_pipe_{pipe_id}') PipeHandler(self.in_pipe, self.out_pipe).recreate() def run(self): for thread in self._io_threads.values(): thread.start() def stop(self): for thread in self._io_threads.values(): thread.stop() PipeHandler(self.in_pipe, self.out_pipe).remove() if __name__ == "__main__": pipe_io = PipeIOServer() signal(SIGTERM, lambda a, b: pipe_io.stop()) signal(SIGINT, lambda a, b: pipe_io.stop()) pipe_io.run() print('Running pipe IO server with named pipes:') print(f'Input: {pipe_io.in_pipe}') print(f'Output: {pipe_io.out_pipe}')