Refactor existing networking bits

This commit is contained in:
Kristóf Tóth 2020-04-30 02:14:51 +02:00
parent 70227d0eff
commit 1985c0df4f
3 changed files with 100 additions and 60 deletions

80
connection/listen.go Normal file
View File

@ -0,0 +1,80 @@
package connection
import (
"net"
"io"
)
const defaultBufSize = 1 * 1024
// Connection represents a recv-only network connection.
type Connection struct {
BufSize int
conn *net.TCPConn
recvChan chan []byte
stopFlag bool
}
// NewListen starts listening for a single connection on the given address.
// This method blocks until this is done.
func NewListen(address string) (*Connection, error) {
conn, err := listenAndAcceptTCP(address)
if err != nil {
return nil, err
}
return &Connection{
conn: conn,
BufSize: defaultBufSize,
stopFlag: false,
}, nil
}
func listenAndAcceptTCP(address string) (*net.TCPConn, error) {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
sock, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}
conn, err := sock.AcceptTCP()
if err != nil {
return nil, err
}
conn.SetNoDelay(true)
return conn, nil
}
// Recv returns a read-only channel of byte slices and starts
// receiving and pushing data into it asynchronously.
// The channel is closed when the sender closes the connection.
// Use this to read bytes from the network connection.
func (c *Connection) Recv() <-chan []byte {
c.recvChan = make(chan []byte)
go c.recvLoop()
return c.recvChan
}
func (c *Connection) recvLoop() {
buf := make([]byte, c.BufSize)
for !c.stopFlag {
n, err := io.ReadFull(c.conn, buf)
if err != nil {
c.stopFlag = true
}
// the underlying memory of the buffer
// must be copied for thread safety
sendBuf := make([]byte, n)
copy(sendBuf, buf)
c.recvChan <- sendBuf
}
close(c.recvChan)
}
// Close closes the connection
func (c *Connection) Close() {
c.stopFlag = true
c.conn.Close()
}

29
main.go
View File

@ -2,26 +2,37 @@ package main
import (
"fmt"
"os"
"io"
"time"
"remote-mic/socketops"
"os"
"remote-mic/audio"
"remote-mic/connection"
"time"
)
func socketExample() {
fmt.Print("Listening for TCP connection on port 8080... ")
conn, err := socketops.AcceptConn(":8080")
fmt.Print("Listening for connection on port 8080... ")
conn, err := connection.NewListen(":8080")
if err != nil {
panic(err)
}
defer conn.Close()
fmt.Println("Connection accepted!")
data := make(chan []byte, 10)
go socketops.HandleConn(conn, data)
socketops.HandleData(data)
recvChan := conn.Recv()
go func() {
time.Sleep(30 * time.Second)
conn.Close()
}()
for {
buf, open := <-recvChan
if !open {
break
}
fmt.Printf("Bytes read: %d\n", len(buf))
fmt.Printf("Buf: %v\n", buf)
fmt.Printf("%v\n", string(buf))
}
fmt.Println("Exiting.")
}
@ -53,5 +64,5 @@ func micStreamExample() {
}
func main() {
micStreamExample()
socketExample()
}

View File

@ -1,51 +0,0 @@
package socketops
import (
"fmt"
"net"
)
const bufSize = 1024
func AcceptConn(connstr string) (net.Conn, error) {
addr, err := net.ResolveTCPAddr("tcp", connstr)
if err != nil {
return nil, err
}
sock, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}
conn, err := sock.AcceptTCP()
if err != nil {
return nil, err
}
conn.SetNoDelay(true)
return conn, nil
}
func HandleConn(conn net.Conn, out chan<- []byte) {
buf := make([]byte, bufSize)
for {
n, err := conn.Read(buf)
if err != nil {
close(out)
break
}
if n > 0 {
out <- buf[:n]
fmt.Printf("Bytes read: %d\n", n)
}
}
}
func HandleData(data <-chan []byte) {
for {
buf, open := <-data
if !open {
break
}
fmt.Printf("%v\n", string(buf))
}
}