pipe-io-server/clients/go/pipeio/pipeio.go

117 lines
2.1 KiB
Go
Raw Normal View History

package pipeio
import (
"os"
"io"
"bufio"
"errors"
)
const defaultBufSize = 65536
type pipeReader struct {
pipe *os.File
scanner *bufio.Scanner
messageHandler messageHandlerFunc
}
type messageHandlerFunc func([]byte)
func NewPipeReader(pipePath string) (*pipeReader, error) {
pipe, err := os.Open(pipePath)
scanner := bufio.NewScanner(pipe)
scanner.Split(bufio.ScanLines)
if err != nil {
return nil, err
}
scanner.Buffer(make([]byte, defaultBufSize), defaultBufSize)
return &pipeReader{
pipe: pipe,
scanner: scanner,
messageHandler: func([]byte){},
}, nil
}
func (preader *pipeReader) Close() {
preader.pipe.Close()
}
func (preader *pipeReader) SetMessageHandler(fun messageHandlerFunc) {
preader.messageHandler = fun
}
func (preader *pipeReader) Run() {
for {
msg := preader.RecvMessage()
if msg == nil {
break
}
preader.messageHandler(msg)
}
}
func (preader *pipeReader) RecvMessage() []byte {
if preader.scanner.Scan() {
return preader.scanner.Bytes()
}
if err := preader.scanner.Err(); err != nil && err != io.EOF {
if err == bufio.ErrTooLong {
err = errors.New("pipeReader buffer too small!")
}
panic(err)
}
return nil
}
func (preader *pipeReader) BufSize(size int) {
preader.scanner.Buffer(make([]byte, defaultBufSize), size)
}
type pipeWriter struct {
pipe *os.File
}
func NewPipeWriter(pipePath string) (*pipeWriter, error) {
pipe, err := os.OpenFile(pipePath, os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
return &pipeWriter{pipe: pipe}, nil
}
func (pwriter *pipeWriter) Close() {
pwriter.pipe.Close()
}
func (pwriter *pipeWriter) SendMessage(msg []byte) {
pwriter.pipe.Write(append(msg, '\n'))
}
type pipeIO struct {
Reader pipeReader
Writer pipeWriter
}
func NewPipeIO(inPipePath, outPipePath string) (*pipeIO, error) {
reader, err := NewPipeReader(inPipePath)
if err != nil {
return nil, err
}
writer, err := NewPipeWriter(outPipePath)
if err != nil {
return nil, err
}
return &pipeIO{
Reader: *reader,
Writer: *writer,
}, nil
}
func (pipeio *pipeIO) Close() {
pipeio.Reader.Close()
pipeio.Writer.Close()
}