From 1985c0df4f3f55a2499700b3934d5e13fc0fab18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Thu, 30 Apr 2020 02:14:51 +0200 Subject: [PATCH] Refactor existing networking bits --- connection/listen.go | 80 ++++++++++++++++++++++++++++++++++++++++++ main.go | 29 ++++++++++----- socketops/socketops.go | 51 --------------------------- 3 files changed, 100 insertions(+), 60 deletions(-) create mode 100644 connection/listen.go delete mode 100644 socketops/socketops.go diff --git a/connection/listen.go b/connection/listen.go new file mode 100644 index 0000000..a7a8376 --- /dev/null +++ b/connection/listen.go @@ -0,0 +1,80 @@ +package connection + +import ( + "net" + "io" +) + +const defaultBufSize = 1 * 1024 + + +// Connection represents a recv-only network connection. +type Connection struct { + BufSize int + conn *net.TCPConn + recvChan chan []byte + stopFlag bool +} + +// NewListen starts listening for a single connection on the given address. +// This method blocks until this is done. +func NewListen(address string) (*Connection, error) { + conn, err := listenAndAcceptTCP(address) + if err != nil { + return nil, err + } + return &Connection{ + conn: conn, + BufSize: defaultBufSize, + stopFlag: false, + }, nil +} + +func listenAndAcceptTCP(address string) (*net.TCPConn, error) { + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, err + } + sock, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, err + } + conn, err := sock.AcceptTCP() + if err != nil { + return nil, err + } + conn.SetNoDelay(true) + return conn, nil +} + +// Recv returns a read-only channel of byte slices and starts +// receiving and pushing data into it asynchronously. +// The channel is closed when the sender closes the connection. +// Use this to read bytes from the network connection. +func (c *Connection) Recv() <-chan []byte { + c.recvChan = make(chan []byte) + go c.recvLoop() + return c.recvChan +} + +func (c *Connection) recvLoop() { + buf := make([]byte, c.BufSize) + for !c.stopFlag { + n, err := io.ReadFull(c.conn, buf) + if err != nil { + c.stopFlag = true + } + // the underlying memory of the buffer + // must be copied for thread safety + sendBuf := make([]byte, n) + copy(sendBuf, buf) + c.recvChan <- sendBuf + } + close(c.recvChan) +} + +// Close closes the connection +func (c *Connection) Close() { + c.stopFlag = true + c.conn.Close() +} diff --git a/main.go b/main.go index 242a1df..6661a4b 100644 --- a/main.go +++ b/main.go @@ -2,26 +2,37 @@ package main import ( "fmt" - "os" "io" - "time" - "remote-mic/socketops" + "os" "remote-mic/audio" + "remote-mic/connection" + "time" ) func socketExample() { - fmt.Print("Listening for TCP connection on port 8080... ") - conn, err := socketops.AcceptConn(":8080") + fmt.Print("Listening for connection on port 8080... ") + conn, err := connection.NewListen(":8080") if err != nil { panic(err) } defer conn.Close() fmt.Println("Connection accepted!") - data := make(chan []byte, 10) - go socketops.HandleConn(conn, data) - socketops.HandleData(data) + recvChan := conn.Recv() + go func() { + time.Sleep(30 * time.Second) + conn.Close() + }() + for { + buf, open := <-recvChan + if !open { + break + } + fmt.Printf("Bytes read: %d\n", len(buf)) + fmt.Printf("Buf: %v\n", buf) + fmt.Printf("%v\n", string(buf)) + } fmt.Println("Exiting.") } @@ -53,5 +64,5 @@ func micStreamExample() { } func main() { - micStreamExample() + socketExample() } diff --git a/socketops/socketops.go b/socketops/socketops.go deleted file mode 100644 index 79f3e38..0000000 --- a/socketops/socketops.go +++ /dev/null @@ -1,51 +0,0 @@ -package socketops - -import ( - "fmt" - "net" -) - -const bufSize = 1024 - - -func AcceptConn(connstr string) (net.Conn, error) { - addr, err := net.ResolveTCPAddr("tcp", connstr) - if err != nil { - return nil, err - } - sock, err := net.ListenTCP("tcp", addr) - if err != nil { - return nil, err - } - conn, err := sock.AcceptTCP() - if err != nil { - return nil, err - } - conn.SetNoDelay(true) - return conn, nil -} - -func HandleConn(conn net.Conn, out chan<- []byte) { - buf := make([]byte, bufSize) - for { - n, err := conn.Read(buf) - if err != nil { - close(out) - break - } - if n > 0 { - out <- buf[:n] - fmt.Printf("Bytes read: %d\n", n) - } - } -} - -func HandleData(data <-chan []byte) { - for { - buf, open := <-data - if !open { - break - } - fmt.Printf("%v\n", string(buf)) - } -}