package pipeio import ( "os" "io" "bufio" ) const defaultBufSize = 65536 type pipeReader struct { pipe *os.File scanner *bufio.Scanner messageHandler messageHandlerFunc } type messageHandlerFunc func([]byte) func NewPipeReader(pipePath string) (*pipeReader, error) { pipe, err := os.Open(pipePath) scanner := bufio.NewScanner(pipe) scanner.Split(bufio.ScanLines) if err != nil { return nil, err } scanner.Buffer(make([]byte, defaultBufSize), defaultBufSize) return &pipeReader{ pipe: pipe, scanner: scanner, messageHandler: func([]byte){}, }, nil } func (preader *pipeReader) Close() { preader.pipe.Close() } func (preader *pipeReader) SetMessageHandler(fun messageHandlerFunc) { preader.messageHandler = fun } func (preader *pipeReader) Run() { for { msg := preader.RecvMessage() if msg == nil { break } preader.messageHandler(msg) } } func (preader *pipeReader) RecvMessage() []byte { if preader.scanner.Scan() { return preader.scanner.Bytes() } if err := preader.scanner.Err(); err != io.EOF { panic(err) } return nil } func (preader *pipeReader) BufSize(size int) { preader.scanner.Buffer(make([]byte, size), size) } type pipeWriter struct { pipe *os.File } func NewPipeWriter(pipePath string) (*pipeWriter, error) { pipe, err := os.OpenFile(pipePath, os.O_WRONLY, 0644) if err != nil { return nil, err } return &pipeWriter{pipe: pipe}, nil } func (pwriter *pipeWriter) Close() { pwriter.pipe.Close() } func (pwriter *pipeWriter) SendMessage(msg []byte) { pwriter.pipe.Write(append(msg, '\n')) } type pipeIO struct { Reader pipeReader Writer pipeWriter } func NewPipeIO(inPipePath, outPipePath string) (*pipeIO, error) { reader, err := NewPipeReader(inPipePath) if err != nil { return nil, err } writer, err := NewPipeWriter(outPipePath) if err != nil { return nil, err } return &pipeIO{ Reader: *reader, Writer: *writer, }, nil } func (pipeio *pipeIO) Close() { pipeio.Reader.Close() pipeio.Writer.Close() }