diff --git a/client/client.go b/client/client.go index 5cd3818..00230d1 100644 --- a/client/client.go +++ b/client/client.go @@ -1,6 +1,7 @@ package client import ( + "golang.org/x/sync/errgroup" "math/rand" "net/url" "os" @@ -76,7 +77,7 @@ func (cliCtx *Context) serverReader() error { logger.Debug("frame read", "id", rFrame.ID) - if rFrame.ID > 128 { + if rFrame.IsResponse() { cliCtx.resFromServer <- rFrame } else { cliCtx.reqFromServer <- rFrame @@ -116,38 +117,65 @@ func init() { func testAuth(ctx *Context) { logger.Info("Trying to authenticate as krzmaciek...") - ctx.sendRequest(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) + err := ctx.sendRequest(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) + + if err != nil { + logger.Error(err) + return + } + logger.Debug("Request sent, waiting for response...") arf := ctx.getResponseFrame() ar, err := cm.ResponseFromFrame[cm.AuthResponse](arf) + if err != nil { logger.Error(err) + return } + logger.Infof("Authenticated?: %t", ar.IsSuccess) } func testEcho(ctx *Context) { echoByte := rand.Intn(32) logger.Info("Testing echo...", "echoByte", echoByte) - ctx.sendRequest(cm.EchoRequest{EchoByte: byte(echoByte)}) + err := ctx.sendRequest(cm.EchoRequest{EchoByte: byte(echoByte)}) + + if err != nil { + logger.Error(err) + return + } + logger.Debug("Request sent, waiting for response...") ereqf := ctx.getResponseFrame() ereq, err := cm.ResponseFromFrame[cm.EchoResponse](ereqf) + if err != nil { logger.Error(err) + return } + logger.Info("Got response", "echoByte", ereq.EchoByte) } func testListPeers(ctx *Context) { logger.Info("Trying to get list of peers...") - ctx.sendRequest(cm.ListPeersRequest{}) + err := ctx.sendRequest(cm.ListPeersRequest{}) + + if err != nil { + logger.Error(err) + return + } + logger.Debug("Request sent, waiting for response...") lpreqf := ctx.getResponseFrame() lpreq, err := cm.ResponseFromFrame[cm.ListPeersResponse](lpreqf) + if err != nil { logger.Error(err) + return } + logger.Info("Got that list", "peersList", lpreq.PeersInfo) } @@ -160,18 +188,32 @@ func RunClient() { return } - defer c.Close() + defer func(c *websocket.Conn) { + err := c.Close() + if err != nil { + logger.Error(err) + } + }(c) ctx := NewClientContext(c) - go ctx.serverHandler() - go ctx.serverReader() - go ctx.serverWriter() + errGroup := new(errgroup.Group) + errGroup.Go(ctx.serverHandler) + errGroup.Go(ctx.serverReader) + errGroup.Go(ctx.serverWriter) testAuth(ctx) testEcho(ctx) testListPeers(ctx) + err = errGroup.Wait() + + if err != nil { + logger.Error(err) + } - time.Sleep(time.Second * 5) logger.Info("closing connection...") - c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + + if err != nil { + logger.Error(err) + } } diff --git a/common/common.go b/common/common.go index a07a564..e005441 100644 --- a/common/common.go +++ b/common/common.go @@ -18,10 +18,10 @@ const ( // Requests & responses subtypes type PeerInfo struct { - ID int `json:"id"` - Addr string `json:"addr"` - HasNickaname bool `json:"hasNickname"` - Nickname string `json:"nickname"` + ID int `json:"id"` + Addr string `json:"addr"` + HasNickname bool `json:"hasNickname"` + Nickname string `json:"nickname"` } // Requests & responses: @@ -31,6 +31,14 @@ type RFrame struct { Rest json.RawMessage `json:"r"` } +func (rf RFrame) IsRequest() bool { + return rf.ID <= 128 +} + +func (rf RFrame) IsResponse() bool { + return rf.ID > 128 +} + func RequestFrameFrom(req Request) (RFrame, error) { jsonBytes, err := json.Marshal(req) diff --git a/go.mod b/go.mod index 2ee3c42..2d883cd 100644 --- a/go.mod +++ b/go.mod @@ -20,5 +20,6 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.21.0 // indirect + golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.18.0 // indirect ) diff --git a/go.sum b/go.sum index bd66e3d..af665a2 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/server/server.go b/server/server.go index 14bd9fc..7ef7b98 100644 --- a/server/server.go +++ b/server/server.go @@ -2,6 +2,7 @@ package server import ( "encoding/json" + "golang.org/x/sync/errgroup" "net/http" "os" "strings" @@ -46,9 +47,7 @@ func NewHandlerContext(peer *Peer, srvCtx *Context) *HandlerContext { } } -func (hdlCtx *HandlerContext) clientHandler(hdlWg *sync.WaitGroup) error { - defer hdlWg.Done() - +func (hdlCtx *HandlerContext) clientHandler() error { for { reqFrame := <-hdlCtx.reqFromClient var res common.Response @@ -68,6 +67,7 @@ func (hdlCtx *HandlerContext) clientHandler(hdlWg *sync.WaitGroup) error { } resFrame, err := common.ResponseFrameFrom(res) + if err != nil { logger.Errorf("could not create frame from response") return err @@ -77,12 +77,11 @@ func (hdlCtx *HandlerContext) clientHandler(hdlWg *sync.WaitGroup) error { } } -func (hdlCtx *HandlerContext) clientWriter(hdlWg *sync.WaitGroup) error { - defer hdlWg.Done() - +func (hdlCtx *HandlerContext) clientWriter() error { for { rFrame := <-hdlCtx.rToClient resJsonBytes, err := json.Marshal(rFrame) + if err != nil { logger.Errorf("error marshalling frame to json") return err @@ -90,6 +89,7 @@ func (hdlCtx *HandlerContext) clientWriter(hdlWg *sync.WaitGroup) error { logger.Debugf("sending %s", string(resJsonBytes)) err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes) + if err != nil { logger.Errorf("error writing rframe") return err @@ -97,11 +97,10 @@ func (hdlCtx *HandlerContext) clientWriter(hdlWg *sync.WaitGroup) error { } } -func (hdlCtx *HandlerContext) clientReader(hdlWg *sync.WaitGroup) error { - defer hdlWg.Done() - +func (hdlCtx *HandlerContext) clientReader() error { for { messType, messBytes, err := hdlCtx.peer.conn.ReadMessage() + if err != nil { return err } @@ -116,10 +115,15 @@ func (hdlCtx *HandlerContext) clientReader(hdlWg *sync.WaitGroup) error { logger.Debugf("got message text: %s", strings.Trim(string(messBytes), "\n")) var rFrame common.RFrame - json.Unmarshal(messBytes, &rFrame) + err = json.Unmarshal(messBytes, &rFrame) + + if err != nil { + return err + } + logger.Debugf("unmarshalled request frame (ID=%d)", rFrame.ID) - if rFrame.ID > 128 { + if rFrame.IsResponse() { logger.Debug("it is response frame", "id", rFrame.ID) hdlCtx.resFromClient <- rFrame } else { @@ -131,6 +135,7 @@ func (hdlCtx *HandlerContext) clientReader(hdlWg *sync.WaitGroup) error { func (hdlCtx *HandlerContext) sendRequest(req common.Request) error { rf, err := common.RequestFrameFrom(req) + if err != nil { return err } @@ -171,11 +176,13 @@ func NewPeer(conn *websocket.Conn) *Peer { func peerSliceIndexOf(s []*Peer, id int) int { i := 0 var p *Peer + for i, p = range s { if p.id == id { break } } + return i } @@ -197,6 +204,7 @@ func handleDisconnection(handlerCtx *HandlerContext) { func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RFrame) (res common.Response, err error) { echoReq, err := common.RequestFromFrame[common.EchoRequest](*reqFrame) + if err != nil { logger.Error("could not read request from frame") return nil, err @@ -209,6 +217,7 @@ func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RFrame) (res common.Re func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res common.Response, err error) { // Currently list peers request is empty, so we can ignore it - we won't use it _, err = common.RequestFromFrame[common.ListPeersRequest](*reqFrame) + if err != nil { logger.Error("could not read request from frame") return nil, err @@ -224,10 +233,10 @@ func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res comm listPeersRes.PeersInfo = append( listPeersRes.PeersInfo, common.PeerInfo{ - ID: peer.id, - Addr: peer.conn.RemoteAddr().String(), - HasNickaname: peer.hasAccount, - Nickname: peer.account.nickname, + ID: peer.id, + Addr: peer.conn.RemoteAddr().String(), + HasNickname: peer.hasAccount, + Nickname: peer.account.nickname, }, ) } @@ -237,6 +246,7 @@ func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res comm func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RFrame) (res common.Response, err error) { authReq, err := common.RequestFromFrame[common.AuthRequest](*reqFrame) + if err != nil { logger.Error("could not read request from frame") return nil, err @@ -307,9 +317,26 @@ func (srvCtx *Context) addPeer(peer *Peer) { srvCtx.peersListLock.Unlock() } +func testEcho(hdlCtx *HandlerContext) { + logger.Debug("sending echo request...") + _ = hdlCtx.sendRequest(common.EchoRequest{EchoByte: 123}) + logger.Debug("sent") + echoResF := hdlCtx.getResponseFrame() + logger.Debug("got response") + echoRes, err := common.ResponseFromFrame[common.EchoResponse](echoResF) + + if err != nil { + logger.Error(err) + return + } + + logger.Debug("test echo done", "byteSent", 123, "byteReceived", echoRes.EchoByte) +} + func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{} conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { logger.Errorf("upgrade failed") return @@ -319,28 +346,26 @@ func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { srvCtx.addPeer(peer) handlerCtx := NewHandlerContext(peer, srvCtx) defer handleDisconnection(handlerCtx) - defer conn.Close() + + defer func(conn *websocket.Conn) { + err := conn.Close() + if err != nil { + logger.Error(err) + } + }(conn) + logger.Infof("%s connected", conn.RemoteAddr()) + errGroup := new(errgroup.Group) + errGroup.Go(handlerCtx.clientHandler) + errGroup.Go(handlerCtx.clientWriter) + errGroup.Go(handlerCtx.clientReader) + testEcho(handlerCtx) + err = errGroup.Wait() - var handlerWg sync.WaitGroup - handlerWg.Add(3) - go handlerCtx.clientWriter(&handlerWg) - go handlerCtx.clientHandler(&handlerWg) - go handlerCtx.clientReader(&handlerWg) - - logger.Debug("sending echo request...") - handlerCtx.sendRequest(common.EchoRequest{EchoByte: 123}) - logger.Debug("sent") - echoResF := handlerCtx.getResponseFrame() - logger.Debug("got response") - echoRes, err := common.ResponseFromFrame[common.EchoResponse](echoResF) if err != nil { logger.Error(err) return } - logger.Debug("test echo done", "byteSent", 123, "byteReceived", echoRes.EchoByte) - - handlerWg.Wait() } func RunServer() { @@ -355,5 +380,9 @@ func RunServer() { http.HandleFunc("/wsapi", srvCtx.wsapiHandler) logger.Info("Starting server...") - http.ListenAndServe(":8080", nil) + err := http.ListenAndServe(":8080", nil) + + if err != nil { + logger.Error(err) + } }