pipe-io-server/pipe_io_server.py

109 lines
3.1 KiB
Python

from threading import Thread
from queue import Queue
from os import mkfifo, remove, getpid, kill
from os.path import exists, join
from signal import SIGTERM
from secrets import token_urlsafe
from collections import namedtuple
from abc import ABC, abstractmethod
from traceback import print_exc
class PipeWriterThread(Thread):
def __init__(self, pipe_path):
super().__init__()
self._pipe_path = pipe_path
self._write_queue = Queue()
def write(self, message):
self._write_queue.put(message, block=True)
def run(self):
while True:
message = self._write_queue.get(block=True)
if message is None:
break
with open(self._pipe_path, 'wb') as pipe:
pipe.write(message)
def stop(self):
self._write_queue.put(None)
self.join()
class PipeReaderThread(Thread):
_stop_sequence = b'stop_reading'
def __init__(self, pipe_path, message_handler):
super().__init__()
self._message_handler = message_handler
self._pipe_path = pipe_path
def run(self):
while True:
with open(self._pipe_path, 'rb') as pipe:
message = pipe.read()
if message == self._stop_sequence:
break
try:
self._message_handler(message)
except:
print_exc()
kill(getpid(), SIGTERM)
def stop(self):
with open(self._pipe_path, 'wb') as pipe:
pipe.write(self._stop_sequence)
self.join()
class PipeHandler:
def __init__(self, *pipe_paths):
self._pipe_paths = pipe_paths
def recreate(self):
self.remove()
for pipe_path in self._pipe_paths:
mkfifo(pipe_path)
def remove(self):
for pipe_path in self._pipe_paths:
if exists(pipe_path):
remove(pipe_path)
class PipeIOServer(ABC):
def __init__(self, in_pipe=None, out_pipe=None):
self.in_pipe, self.out_pipe = in_pipe, out_pipe
self._create_pipes()
io_threads_dict = {
'reader': PipeReaderThread(self.in_pipe, self._handle_message),
'writer': PipeWriterThread(self.out_pipe)
}
IOThreadsTuple = namedtuple('IOThreadsTuple', sorted(io_threads_dict.keys()))
self._io_threads = IOThreadsTuple(**io_threads_dict)
def _create_pipes(self):
if not self.in_pipe or not self.out_pipe:
pipe_id = token_urlsafe(6)
self.in_pipe = join('/tmp', f'in_pipe_{pipe_id}')
self.out_pipe = join('/tmp', f'out_pipe_{pipe_id}')
PipeHandler(self.in_pipe, self.out_pipe).recreate()
@abstractmethod
def _handle_message(self, message):
raise NotImplementedError()
def send(self, message):
self._io_threads.writer.write(message)
def run(self):
for thread in self._io_threads:
thread.start()
def stop(self):
for thread in self._io_threads:
thread.stop()
PipeHandler(self.in_pipe, self.out_pipe).remove()