From ddadf31ff339a43a4ea70287e7db6c24443d4154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krist=C3=B3f=20T=C3=B3th?= Date: Mon, 4 May 2020 21:51:41 +0200 Subject: [PATCH] Add quick & dirty prototype for streamer side --- connection/connect.go | 51 +++++++++++++++++++++++++ connection/connection.go | 22 +++++++++++ connection/listen.go | 27 +++---------- main.go | 82 ++++++++++++++++++++++++++-------------- 4 files changed, 131 insertions(+), 51 deletions(-) create mode 100644 connection/connect.go create mode 100644 connection/connection.go diff --git a/connection/connect.go b/connection/connect.go new file mode 100644 index 0000000..a6367f8 --- /dev/null +++ b/connection/connect.go @@ -0,0 +1,51 @@ +package connection + +import ( + "net" + "fmt" +) + + +func NewConnect(address string) (*Connection, error) { + conn, err := connectTCP(address) + if err != nil { + return nil, err + } + return &Connection{ + conn: conn, + BufSize: defaultBufSize, + stopFlag: false, + }, nil +} + +func connectTCP(address string) (*net.TCPConn, error) { + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, err + } + conn, err := net.DialTCP("tcp", nil, addr) + if err != nil { + return nil, err + } + return conn, nil +} + +func (c *Connection) SendChan() (chan<- []byte) { + c.dataChan = make(chan []byte) + go c.sendLoop() + return c.dataChan +} + +func (c *Connection) sendLoop() { + for !c.stopFlag { + data, open := <- c.dataChan + if !open { + c.stopFlag = true + } + _, err := c.conn.Write(data) + if err != nil { + fmt.Println(err) + c.stopFlag = true + } + } +} diff --git a/connection/connection.go b/connection/connection.go new file mode 100644 index 0000000..03cd7cf --- /dev/null +++ b/connection/connection.go @@ -0,0 +1,22 @@ +package connection + +import ( + "net" +) + +const defaultBufSize = 1 * 1024 + + +// Connection represents a recv-only network connection. +type Connection struct { + BufSize int + conn *net.TCPConn + dataChan chan []byte + stopFlag bool +} + +// Close closes the connection +func (c *Connection) Close() { + c.stopFlag = true + c.conn.Close() +} diff --git a/connection/listen.go b/connection/listen.go index 256a5af..d33e1f1 100644 --- a/connection/listen.go +++ b/connection/listen.go @@ -2,19 +2,8 @@ package connection import ( "net" - "io" ) -const defaultBufSize = 1 * 1024 - - -// Connection represents a recv-only network connection. -type Connection struct { - BufSize int - conn *net.TCPConn - recvChan chan []byte - stopFlag bool -} // NewListen starts listening for a single connection on the given address. // This method blocks until this is done. @@ -53,15 +42,15 @@ func listenAndAcceptTCP(address string) (*net.TCPConn, error) { // Use this to read bytes from the network connection. // The data read is owned by the reader, it is thread-safe to modify. func (c *Connection) RecvChan() <-chan []byte { - c.recvChan = make(chan []byte) + c.dataChan = make(chan []byte) go c.recvLoop() - return c.recvChan + return c.dataChan } func (c *Connection) recvLoop() { buf := make([]byte, c.BufSize) for !c.stopFlag { - n, err := io.ReadFull(c.conn, buf) + n, err := c.conn.Read(buf) if err != nil { c.stopFlag = true } @@ -70,14 +59,8 @@ func (c *Connection) recvLoop() { // must be copied for thread safety sendBuf := make([]byte, n) copy(sendBuf, buf) - c.recvChan <- sendBuf + c.dataChan <- sendBuf } } - close(c.recvChan) -} - -// Close closes the connection -func (c *Connection) Close() { - c.stopFlag = true - c.conn.Close() + close(c.dataChan) } diff --git a/main.go b/main.go index f25c1e3..6421918 100644 --- a/main.go +++ b/main.go @@ -10,33 +10,6 @@ import ( ) -func socketExample() { - fmt.Print("Listening for connection on port 8080... ") - conn, err := connection.NewListen(":8080") - if err != nil { - panic(err) - } - defer conn.Close() - fmt.Println("Connection accepted!") - - recvChan := conn.RecvChan() - go func() { - time.Sleep(10 * time.Second) - conn.Close() - }() - for { - buf, open := <-recvChan - if !open { - break - } - fmt.Printf("Bytes read: %d\n", len(buf)) - fmt.Printf("Buf: %v\n", buf) - fmt.Printf("%v\n", string(buf)) - } - - fmt.Println("Exiting.") -} - func pulsectlExample() { pulsectl := audio.NewPulsectl() err := pulsectl.LoadPipeSourceModule() @@ -63,6 +36,57 @@ func micStreamExample() { } } -func main() { - socketExample() +func usage() { + fmt.Println("Usage: remote-mic ") + os.Exit(1) +} + +func listen() { + fmt.Print("Listening for connection on port 8080... ") + conn, err := connection.NewListen(":8080") + if err != nil { + panic(err) + } + fmt.Println("Connection accepted!") + + recvChan := conn.RecvChan() + for { + buf, open := <-recvChan + if !open { + break + } + fmt.Printf("Bytes read: %d\n", len(buf)) + fmt.Printf("Buf: %v\n", buf) + fmt.Printf("%v\n", string(buf)) + } +} + +func stream() { + conn2, err := connection.NewConnect("localhost:8080") + if err != nil { + fmt.Println(err) + panic(err) + } + + c := conn2.SendChan() + c <- []byte("cicasajtok") + + time.Sleep(20 * time.Second) + fmt.Println("Exiting.") +} + +func main() { + if len(os.Args) < 2 { + usage() + } + cmd := os.Args[1] + + switch cmd { + case "listen": + listen() + case "stream": + stream() + default: + usage() + } }