Refactor networking code

This commit is contained in:
Kristóf Tóth 2020-05-05 01:10:30 +02:00
parent ddadf31ff3
commit aeb974a820
7 changed files with 152 additions and 146 deletions

View File

@ -1,51 +0,0 @@
package connection
import (
"net"
"fmt"
)
func NewConnect(address string) (*Connection, error) {
conn, err := connectTCP(address)
if err != nil {
return nil, err
}
return &Connection{
conn: conn,
BufSize: defaultBufSize,
stopFlag: false,
}, nil
}
func connectTCP(address string) (*net.TCPConn, error) {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return nil, err
}
return conn, nil
}
func (c *Connection) SendChan() (chan<- []byte) {
c.dataChan = make(chan []byte)
go c.sendLoop()
return c.dataChan
}
func (c *Connection) sendLoop() {
for !c.stopFlag {
data, open := <- c.dataChan
if !open {
c.stopFlag = true
}
_, err := c.conn.Write(data)
if err != nil {
fmt.Println(err)
c.stopFlag = true
}
}
}

View File

@ -1,22 +0,0 @@
package connection
import (
"net"
)
const defaultBufSize = 1 * 1024
// Connection represents a recv-only network connection.
type Connection struct {
BufSize int
conn *net.TCPConn
dataChan chan []byte
stopFlag bool
}
// Close closes the connection
func (c *Connection) Close() {
c.stopFlag = true
c.conn.Close()
}

View File

@ -1,66 +0,0 @@
package connection
import (
"net"
)
// NewListen starts listening for a single connection on the given address.
// This method blocks until this is done.
func NewListen(address string) (*Connection, error) {
conn, err := listenAndAcceptTCP(address)
if err != nil {
return nil, err
}
return &Connection{
conn: conn,
BufSize: defaultBufSize,
stopFlag: false,
}, nil
}
func listenAndAcceptTCP(address string) (*net.TCPConn, error) {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
sock, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}
conn, err := sock.AcceptTCP()
if err != nil {
return nil, err
}
conn.SetNoDelay(true)
return conn, nil
}
// RecvChan returns a read-only channel of byte slices and starts
// receiving and pushing data into it asynchronously.
// The channel is closed when the sender closes the connection.
// Use this to read bytes from the network connection.
// The data read is owned by the reader, it is thread-safe to modify.
func (c *Connection) RecvChan() <-chan []byte {
c.dataChan = make(chan []byte)
go c.recvLoop()
return c.dataChan
}
func (c *Connection) recvLoop() {
buf := make([]byte, c.BufSize)
for !c.stopFlag {
n, err := c.conn.Read(buf)
if err != nil {
c.stopFlag = true
}
if n > 0 {
// the underlying memory of the buffer
// must be copied for thread safety
sendBuf := make([]byte, n)
copy(sendBuf, buf)
c.dataChan <- sendBuf
}
}
close(c.dataChan)
}

16
main.go
View File

@ -5,7 +5,7 @@ import (
"io"
"os"
"remote-mic/audio"
"remote-mic/connection"
"remote-mic/networking"
"time"
)
@ -43,13 +43,15 @@ func usage() {
func listen() {
fmt.Print("Listening for connection on port 8080... ")
conn, err := connection.NewListen(":8080")
conn, err := networking.ListenAndAcceptTCP(":8080")
if err != nil {
panic(err)
}
reader := networking.NewAsyncReader(conn)
defer reader.Close()
fmt.Println("Connection accepted!")
recvChan := conn.RecvChan()
recvChan := reader.RecvChan()
for {
buf, open := <-recvChan
if !open {
@ -62,16 +64,16 @@ func listen() {
}
func stream() {
conn2, err := connection.NewConnect("localhost:8080")
conn, err := networking.ConnectTCP("localhost:8080")
if err != nil {
fmt.Println(err)
panic(err)
}
writer := networking.NewAsyncWriter(conn)
defer writer.Close()
c := conn2.SendChan()
c := writer.SendChan()
c <- []byte("cicasajtok")
time.Sleep(20 * time.Second)
fmt.Println("Exiting.")
}

38
networking/io.go Normal file
View File

@ -0,0 +1,38 @@
package networking
import (
"net"
)
const defaultBufSize = 1 * 1024
func ListenAndAcceptTCP(address string) (*net.TCPConn, error) {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
sock, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}
conn, err := sock.AcceptTCP()
if err != nil {
return nil, err
}
conn.SetNoDelay(true)
return conn, nil
}
func ConnectTCP(address string) (*net.TCPConn, error) {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return nil, err
}
conn.SetNoDelay(true)
return conn, nil
}

57
networking/reader.go Normal file
View File

@ -0,0 +1,57 @@
package networking
import (
"io"
)
type AsyncReader struct {
BufSize int
stream io.ReadCloser
dataChan chan []byte
stopFlag bool
}
// NewListen starts listening for a single connection on the given address.
// This method blocks until this is done.
func NewAsyncReader(reader io.ReadCloser) *AsyncReader {
return &AsyncReader{
BufSize: defaultBufSize,
stream: reader,
stopFlag: false,
}
}
// RecvChan returns a read-only channel of byte slices and starts
// receiving and pushing data into it asynchronously.
// The channel is closed when the sender closes the connection.
// Use this to read bytes from the network connection.
// The data read is owned by the reader, it is thread-safe to modify.
func (ar *AsyncReader) RecvChan() <-chan []byte {
ar.dataChan = make(chan []byte)
go ar.recvLoop()
return ar.dataChan
}
func (ar *AsyncReader) recvLoop() {
buf := make([]byte, ar.BufSize)
for !ar.stopFlag {
n, err := ar.stream.Read(buf)
if err != nil {
ar.stopFlag = true
}
if n > 0 {
// the underlying memory of the buffer
// must be copied for thread safety
sendBuf := make([]byte, n)
copy(sendBuf, buf)
ar.dataChan <- sendBuf
}
}
close(ar.dataChan)
}
func (ar *AsyncReader) Close() {
ar.stopFlag = true
ar.stream.Close()
}

48
networking/writer.go Normal file
View File

@ -0,0 +1,48 @@
package networking
import (
"fmt"
"io"
)
type AsyncWriter struct {
BufSize int
stream io.WriteCloser
dataChan chan []byte
stopFlag bool
}
func NewAsyncWriter(writer io.WriteCloser) *AsyncWriter {
return &AsyncWriter{
BufSize: defaultBufSize,
stream: writer,
stopFlag: false,
}
}
func (aw *AsyncWriter) SendChan() (chan<- []byte) {
aw.dataChan = make(chan []byte)
go aw.sendLoop()
return aw.dataChan
}
func (aw *AsyncWriter) sendLoop() {
for !aw.stopFlag {
data, open := <- aw.dataChan
if !open {
aw.stopFlag = true
return
}
_, err := aw.stream.Write(data)
if err != nil {
fmt.Println("ouchie")
aw.stopFlag = true
}
}
}
func (aw *AsyncWriter) Close() {
aw.stopFlag = true
close(aw.dataChan)
}