pipe-io-server/pipe_io_server.py
2018-12-13 22:06:13 +01:00

101 lines
2.8 KiB
Python

from threading import Thread
from queue import Queue
from os import mkfifo, remove
from os.path import exists, join
from signal import signal, SIGTERM, SIGINT
from secrets import token_urlsafe
class PipeWriterThread(Thread):
def __init__(self, to_write_queue, pipe_path):
super().__init__()
self._to_write_queue = to_write_queue
self._pipe_path = pipe_path
def run(self):
while True:
message = self._to_write_queue.get(block=True)
if message is None:
break
with open(self._pipe_path, 'wb') as pipe:
pipe.write(message)
def stop(self):
self._to_write_queue.put(None)
self.join()
class PipeReaderThread(Thread):
_stop_sequence = b'stop_reading'
def __init__(self, results_queue, pipe_path):
super().__init__()
self._results_queue = results_queue
self._pipe_path = pipe_path
def run(self):
while True:
with open(self._pipe_path, 'rb') as pipe:
message = pipe.read()
if message == self._stop_sequence:
break
self._results_queue.put(message, block=True)
def stop(self):
with open(self._pipe_path, 'wb') as pipe:
pipe.write(self._stop_sequence)
self.join()
class PipeHandler:
def __init__(self, pipe_path):
self._pipe_path = pipe_path
def recreate(self):
self.remove()
mkfifo(self._pipe_path)
def remove(self):
if exists(self._pipe_path):
remove(self._pipe_path)
class PipeIOServer:
def __init__(self, in_pipe=None, out_pipe=None):
self.in_pipe, self.out_pipe = in_pipe, out_pipe
self._create_pipes()
self._message_queue = Queue()
self._io_threads = {
'reader': PipeReaderThread(self._message_queue, self.in_pipe),
'writer': PipeWriterThread(self._message_queue, self.out_pipe)
}
def _create_pipes(self):
if not self.in_pipe or not self.out_pipe:
pipe_id = token_urlsafe(6)
self.in_pipe = join('/tmp', f'in_pipe_{pipe_id}')
self.out_pipe = join('/tmp', f'out_pipe_{pipe_id}')
PipeHandler(self.in_pipe).recreate()
PipeHandler(self.out_pipe).recreate()
def run(self):
for thread in self._io_threads.values():
thread.start()
def stop(self):
for thread in self._io_threads.values():
thread.stop()
PipeHandler(self.in_pipe).remove()
PipeHandler(self.out_pipe).remove()
if __name__ == "__main__":
pipe_io = PipeIOServer()
signal(SIGTERM, lambda a, b: pipe_io.stop())
signal(SIGINT, lambda a, b: pipe_io.stop())
pipe_io.run()
print('Running pipe IO server with named pipes:')
print(f'Input: {pipe_io.in_pipe}')
print(f'Output: {pipe_io.out_pipe}')