109 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			109 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from threading import Thread
 | 
						|
from queue import Queue
 | 
						|
from os import mkfifo, remove, getpid, kill
 | 
						|
from os.path import exists, join
 | 
						|
from signal import SIGTERM
 | 
						|
from secrets import token_urlsafe
 | 
						|
from collections import namedtuple
 | 
						|
from abc import ABC, abstractmethod
 | 
						|
from traceback import print_exc
 | 
						|
 | 
						|
 | 
						|
class PipeWriterThread(Thread):
 | 
						|
    def __init__(self, pipe_path):
 | 
						|
        super().__init__()
 | 
						|
        self._pipe_path = pipe_path
 | 
						|
        self._write_queue = Queue()
 | 
						|
 | 
						|
    def write(self, message):
 | 
						|
        self._write_queue.put(message, block=True)
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        while True:
 | 
						|
            message = self._write_queue.get(block=True)
 | 
						|
            if message is None:
 | 
						|
                break
 | 
						|
            with open(self._pipe_path, 'wb') as pipe:
 | 
						|
                pipe.write(message)
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        self._write_queue.put(None)
 | 
						|
        self.join()
 | 
						|
 | 
						|
 | 
						|
class PipeReaderThread(Thread):
 | 
						|
    _stop_sequence = b'stop_reading'
 | 
						|
 | 
						|
    def __init__(self, pipe_path, message_handler):
 | 
						|
        super().__init__()
 | 
						|
        self._message_handler = message_handler
 | 
						|
        self._pipe_path = pipe_path
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        while True:
 | 
						|
            with open(self._pipe_path, 'rb') as pipe:
 | 
						|
                message = pipe.read()
 | 
						|
            if message == self._stop_sequence:
 | 
						|
                break
 | 
						|
            try:
 | 
						|
                self._message_handler(message)
 | 
						|
            except:  # pylint: disable=bare-except
 | 
						|
                print_exc()
 | 
						|
                kill(getpid(), SIGTERM)
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        with open(self._pipe_path, 'wb') as pipe:
 | 
						|
            pipe.write(self._stop_sequence)
 | 
						|
        self.join()
 | 
						|
 | 
						|
 | 
						|
class PipeHandler:
 | 
						|
    def __init__(self, *pipe_paths):
 | 
						|
        self._pipe_paths = pipe_paths
 | 
						|
 | 
						|
    def recreate(self):
 | 
						|
        self.remove()
 | 
						|
        for pipe_path in self._pipe_paths:
 | 
						|
            mkfifo(pipe_path)
 | 
						|
 | 
						|
    def remove(self):
 | 
						|
        for pipe_path in self._pipe_paths:
 | 
						|
            if exists(pipe_path):
 | 
						|
                remove(pipe_path)
 | 
						|
 | 
						|
 | 
						|
class PipeIOServer(ABC):
 | 
						|
    def __init__(self, in_pipe=None, out_pipe=None):
 | 
						|
        self.in_pipe, self.out_pipe = in_pipe, out_pipe
 | 
						|
        self._create_pipes()
 | 
						|
 | 
						|
        io_threads_dict = {
 | 
						|
            'reader': PipeReaderThread(self.in_pipe, self._handle_message),
 | 
						|
            'writer': PipeWriterThread(self.out_pipe)
 | 
						|
        }
 | 
						|
        IOThreadsTuple = namedtuple('IOThreadsTuple', sorted(io_threads_dict.keys()))
 | 
						|
        self._io_threads = IOThreadsTuple(**io_threads_dict)
 | 
						|
 | 
						|
    def _create_pipes(self):
 | 
						|
        if not self.in_pipe or not self.out_pipe:
 | 
						|
            pipe_id = token_urlsafe(6)
 | 
						|
            self.in_pipe = join('/tmp', f'in_pipe_{pipe_id}')
 | 
						|
            self.out_pipe = join('/tmp', f'out_pipe_{pipe_id}')
 | 
						|
        PipeHandler(self.in_pipe, self.out_pipe).recreate()
 | 
						|
 | 
						|
    @abstractmethod
 | 
						|
    def _handle_message(self, message):
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
    def send(self, message):
 | 
						|
        self._io_threads.writer.write(message)
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        for thread in self._io_threads:
 | 
						|
            thread.start()
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        for thread in self._io_threads:
 | 
						|
            thread.stop()
 | 
						|
        PipeHandler(self.in_pipe, self.out_pipe).remove()
 |