diff --git a/connection/connect.go b/connection/connect.go deleted file mode 100644 index a6367f8..0000000 --- a/connection/connect.go +++ /dev/null @@ -1,51 +0,0 @@ -package connection - -import ( - "net" - "fmt" -) - - -func NewConnect(address string) (*Connection, error) { - conn, err := connectTCP(address) - if err != nil { - return nil, err - } - return &Connection{ - conn: conn, - BufSize: defaultBufSize, - stopFlag: false, - }, nil -} - -func connectTCP(address string) (*net.TCPConn, error) { - addr, err := net.ResolveTCPAddr("tcp", address) - if err != nil { - return nil, err - } - conn, err := net.DialTCP("tcp", nil, addr) - if err != nil { - return nil, err - } - return conn, nil -} - -func (c *Connection) SendChan() (chan<- []byte) { - c.dataChan = make(chan []byte) - go c.sendLoop() - return c.dataChan -} - -func (c *Connection) sendLoop() { - for !c.stopFlag { - data, open := <- c.dataChan - if !open { - c.stopFlag = true - } - _, err := c.conn.Write(data) - if err != nil { - fmt.Println(err) - c.stopFlag = true - } - } -} diff --git a/connection/connection.go b/connection/connection.go deleted file mode 100644 index 03cd7cf..0000000 --- a/connection/connection.go +++ /dev/null @@ -1,22 +0,0 @@ -package connection - -import ( - "net" -) - -const defaultBufSize = 1 * 1024 - - -// Connection represents a recv-only network connection. -type Connection struct { - BufSize int - conn *net.TCPConn - dataChan chan []byte - stopFlag bool -} - -// Close closes the connection -func (c *Connection) Close() { - c.stopFlag = true - c.conn.Close() -} diff --git a/connection/listen.go b/connection/listen.go deleted file mode 100644 index d33e1f1..0000000 --- a/connection/listen.go +++ /dev/null @@ -1,66 +0,0 @@ -package connection - -import ( - "net" -) - - -// NewListen starts listening for a single connection on the given address. -// This method blocks until this is done. -func NewListen(address string) (*Connection, error) { - conn, err := listenAndAcceptTCP(address) - if err != nil { - return nil, err - } - return &Connection{ - conn: conn, - BufSize: defaultBufSize, - stopFlag: false, - }, nil -} - -func listenAndAcceptTCP(address string) (*net.TCPConn, error) { - addr, err := net.ResolveTCPAddr("tcp", address) - if err != nil { - return nil, err - } - sock, err := net.ListenTCP("tcp", addr) - if err != nil { - return nil, err - } - conn, err := sock.AcceptTCP() - if err != nil { - return nil, err - } - conn.SetNoDelay(true) - return conn, nil -} - -// RecvChan returns a read-only channel of byte slices and starts -// receiving and pushing data into it asynchronously. -// The channel is closed when the sender closes the connection. -// Use this to read bytes from the network connection. -// The data read is owned by the reader, it is thread-safe to modify. -func (c *Connection) RecvChan() <-chan []byte { - c.dataChan = make(chan []byte) - go c.recvLoop() - return c.dataChan -} - -func (c *Connection) recvLoop() { - buf := make([]byte, c.BufSize) - for !c.stopFlag { - n, err := c.conn.Read(buf) - if err != nil { - c.stopFlag = true - } - if n > 0 { - // the underlying memory of the buffer - // must be copied for thread safety - sendBuf := make([]byte, n) - copy(sendBuf, buf) - c.dataChan <- sendBuf - } - } - close(c.dataChan) -} diff --git a/main.go b/main.go index 6421918..b4cff70 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "io" "os" "remote-mic/audio" - "remote-mic/connection" + "remote-mic/networking" "time" ) @@ -43,13 +43,15 @@ func usage() { func listen() { fmt.Print("Listening for connection on port 8080... ") - conn, err := connection.NewListen(":8080") + conn, err := networking.ListenAndAcceptTCP(":8080") if err != nil { panic(err) } + reader := networking.NewAsyncReader(conn) + defer reader.Close() fmt.Println("Connection accepted!") - recvChan := conn.RecvChan() + recvChan := reader.RecvChan() for { buf, open := <-recvChan if !open { @@ -62,16 +64,16 @@ func listen() { } func stream() { - conn2, err := connection.NewConnect("localhost:8080") + conn, err := networking.ConnectTCP("localhost:8080") if err != nil { - fmt.Println(err) panic(err) } + writer := networking.NewAsyncWriter(conn) + defer writer.Close() - c := conn2.SendChan() + c := writer.SendChan() c <- []byte("cicasajtok") - time.Sleep(20 * time.Second) fmt.Println("Exiting.") } diff --git a/networking/io.go b/networking/io.go new file mode 100644 index 0000000..0179574 --- /dev/null +++ b/networking/io.go @@ -0,0 +1,38 @@ +package networking + +import ( + "net" +) + +const defaultBufSize = 1 * 1024 + + +func ListenAndAcceptTCP(address string) (*net.TCPConn, error) { + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, err + } + sock, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, err + } + conn, err := sock.AcceptTCP() + if err != nil { + return nil, err + } + conn.SetNoDelay(true) + return conn, nil +} + +func ConnectTCP(address string) (*net.TCPConn, error) { + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, err + } + conn, err := net.DialTCP("tcp", nil, addr) + if err != nil { + return nil, err + } + conn.SetNoDelay(true) + return conn, nil +} diff --git a/networking/reader.go b/networking/reader.go new file mode 100644 index 0000000..6a5f858 --- /dev/null +++ b/networking/reader.go @@ -0,0 +1,57 @@ +package networking + +import ( + "io" +) + + +type AsyncReader struct { + BufSize int + stream io.ReadCloser + dataChan chan []byte + stopFlag bool +} + +// NewListen starts listening for a single connection on the given address. +// This method blocks until this is done. +func NewAsyncReader(reader io.ReadCloser) *AsyncReader { + return &AsyncReader{ + BufSize: defaultBufSize, + stream: reader, + stopFlag: false, + } +} + +// RecvChan returns a read-only channel of byte slices and starts +// receiving and pushing data into it asynchronously. +// The channel is closed when the sender closes the connection. +// Use this to read bytes from the network connection. +// The data read is owned by the reader, it is thread-safe to modify. +func (ar *AsyncReader) RecvChan() <-chan []byte { + ar.dataChan = make(chan []byte) + go ar.recvLoop() + return ar.dataChan +} + +func (ar *AsyncReader) recvLoop() { + buf := make([]byte, ar.BufSize) + for !ar.stopFlag { + n, err := ar.stream.Read(buf) + if err != nil { + ar.stopFlag = true + } + if n > 0 { + // the underlying memory of the buffer + // must be copied for thread safety + sendBuf := make([]byte, n) + copy(sendBuf, buf) + ar.dataChan <- sendBuf + } + } + close(ar.dataChan) +} + +func (ar *AsyncReader) Close() { + ar.stopFlag = true + ar.stream.Close() +} diff --git a/networking/writer.go b/networking/writer.go new file mode 100644 index 0000000..33f995a --- /dev/null +++ b/networking/writer.go @@ -0,0 +1,48 @@ +package networking + +import ( + "fmt" + "io" +) + + +type AsyncWriter struct { + BufSize int + stream io.WriteCloser + dataChan chan []byte + stopFlag bool +} + +func NewAsyncWriter(writer io.WriteCloser) *AsyncWriter { + return &AsyncWriter{ + BufSize: defaultBufSize, + stream: writer, + stopFlag: false, + } +} + +func (aw *AsyncWriter) SendChan() (chan<- []byte) { + aw.dataChan = make(chan []byte) + go aw.sendLoop() + return aw.dataChan +} + +func (aw *AsyncWriter) sendLoop() { + for !aw.stopFlag { + data, open := <- aw.dataChan + if !open { + aw.stopFlag = true + return + } + _, err := aw.stream.Write(data) + if err != nil { + fmt.Println("ouchie") + aw.stopFlag = true + } + } +} + +func (aw *AsyncWriter) Close() { + aw.stopFlag = true + close(aw.dataChan) +} \ No newline at end of file