43 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			43 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from contextlib import suppress
 | |
| from os import O_NONBLOCK, O_RDONLY, close
 | |
| from os import open as osopen
 | |
| from queue import Queue
 | |
| from threading import Thread
 | |
| 
 | |
| from .terminate_process_on_failure import terminate_process_on_failure
 | |
| 
 | |
| 
 | |
| class PipeWriterThread(Thread):
 | |
|     def __init__(self, pipe_path, stop_event):
 | |
|         super().__init__()
 | |
|         self._pipe_path = pipe_path
 | |
|         self._stop_event = stop_event
 | |
|         self._write_queue = Queue()
 | |
| 
 | |
|     def write(self, message):
 | |
|         self._write_queue.put(message, block=True)
 | |
| 
 | |
|     @terminate_process_on_failure
 | |
|     def run(self):
 | |
|         try:
 | |
|             with open(self._pipe_path, 'wb') as pipe:
 | |
|                 while True:
 | |
|                     message = self._write_queue.get(block=True)
 | |
|                     if message is None:
 | |
|                         self._stop_event.set()
 | |
|                         break
 | |
|                     pipe.write(message + b'\n')
 | |
|                     pipe.flush()
 | |
|         except BrokenPipeError:
 | |
|             self._stop_event.set()
 | |
| 
 | |
|     def stop(self):
 | |
|         self.unblock()
 | |
|         self.join()
 | |
| 
 | |
|     def unblock(self):
 | |
|         self._write_queue.put(None)
 | |
|         with suppress(OSError):
 | |
|             fd = osopen(self._pipe_path, O_RDONLY | O_NONBLOCK)
 | |
|             close(fd)
 |