Refactor PipeIOServer implementation into several files
This commit is contained in:
		
							
								
								
									
										1
									
								
								pipe_io_server/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								pipe_io_server/__init__.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1 @@
 | 
			
		||||
from .pipe_io_server import PipeIOServer
 | 
			
		||||
@@ -1,60 +1,11 @@
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from queue import Queue
 | 
			
		||||
from os import mkfifo, remove, getpid, kill
 | 
			
		||||
from os import mkfifo, remove
 | 
			
		||||
from os.path import exists, join
 | 
			
		||||
from signal import SIGTERM
 | 
			
		||||
from secrets import token_urlsafe
 | 
			
		||||
from collections import namedtuple
 | 
			
		||||
from abc import ABC, abstractmethod
 | 
			
		||||
from traceback import print_exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeWriterThread(Thread):
 | 
			
		||||
    def __init__(self, pipe_path):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
        self._write_queue = Queue()
 | 
			
		||||
 | 
			
		||||
    def write(self, message):
 | 
			
		||||
        self._write_queue.put(message, block=True)
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            message = self._write_queue.get(block=True)
 | 
			
		||||
            if message is None:
 | 
			
		||||
                break
 | 
			
		||||
            with open(self._pipe_path, 'wb') as pipe:
 | 
			
		||||
                pipe.write(message)
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self._write_queue.put(None)
 | 
			
		||||
        self.join()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeReaderThread(Thread):
 | 
			
		||||
    _stop_sequence = b'stop_reading'
 | 
			
		||||
 | 
			
		||||
    def __init__(self, pipe_path, message_handler):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        self._message_handler = message_handler
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            with open(self._pipe_path, 'rb') as pipe:
 | 
			
		||||
                message = pipe.read()
 | 
			
		||||
            if message == self._stop_sequence:
 | 
			
		||||
                break
 | 
			
		||||
            try:
 | 
			
		||||
                self._message_handler(message)
 | 
			
		||||
            except:  # pylint: disable=bare-except
 | 
			
		||||
                print_exc()
 | 
			
		||||
                kill(getpid(), SIGTERM)
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        with open(self._pipe_path, 'wb') as pipe:
 | 
			
		||||
            pipe.write(self._stop_sequence)
 | 
			
		||||
        self.join()
 | 
			
		||||
from .pipe_reader_thread import PipeReaderThread
 | 
			
		||||
from .pipe_writer_thread import PipeWriterThread
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeHandler:
 | 
			
		||||
							
								
								
									
										30
									
								
								pipe_io_server/pipe_reader_thread.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								pipe_io_server/pipe_reader_thread.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,30 @@
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from os import kill, getpid
 | 
			
		||||
from signal import SIGTERM
 | 
			
		||||
from traceback import print_exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeReaderThread(Thread):
 | 
			
		||||
    _stop_sequence = b'stop_reading'
 | 
			
		||||
 | 
			
		||||
    def __init__(self, pipe_path, message_handler):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        self._message_handler = message_handler
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            with open(self._pipe_path, 'rb') as pipe:
 | 
			
		||||
                message = pipe.read()
 | 
			
		||||
            if message == self._stop_sequence:
 | 
			
		||||
                break
 | 
			
		||||
            try:
 | 
			
		||||
                self._message_handler(message)
 | 
			
		||||
            except:  # pylint: disable=bare-except
 | 
			
		||||
                print_exc()
 | 
			
		||||
                kill(getpid(), SIGTERM)
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        with open(self._pipe_path, 'wb') as pipe:
 | 
			
		||||
            pipe.write(self._stop_sequence)
 | 
			
		||||
        self.join()
 | 
			
		||||
							
								
								
									
										24
									
								
								pipe_io_server/pipe_writer_thread.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								pipe_io_server/pipe_writer_thread.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,24 @@
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from queue import Queue
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipeWriterThread(Thread):
 | 
			
		||||
    def __init__(self, pipe_path):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        self._pipe_path = pipe_path
 | 
			
		||||
        self._write_queue = Queue()
 | 
			
		||||
 | 
			
		||||
    def write(self, message):
 | 
			
		||||
        self._write_queue.put(message, block=True)
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            message = self._write_queue.get(block=True)
 | 
			
		||||
            if message is None:
 | 
			
		||||
                break
 | 
			
		||||
            with open(self._pipe_path, 'wb') as pipe:
 | 
			
		||||
                pipe.write(message)
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self._write_queue.put(None)
 | 
			
		||||
        self.join()
 | 
			
		||||
		Reference in New Issue
	
	Block a user