From f16b53278bf6d470d68061cef4f880473d546262 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Krzy=C5=BCanowski?= Date: Fri, 29 Mar 2024 15:05:42 +0100 Subject: [PATCH] Enabled two-way communication between client and server Server may now send requests to client and client will respond to them. Mechanics similar to how server handled requests was added to client. Also server can now receive responses from client :) --- .gitignore | 1 + client/client.go | 207 +++++++++++++++++++++++++++++---------------- common/common.go | 60 +++++++------ server/server.go | 213 +++++++++++++++++++++++++++++++---------------- 4 files changed, 303 insertions(+), 178 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f11b75 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ diff --git a/client/client.go b/client/client.go index a9d778e..5cd3818 100644 --- a/client/client.go +++ b/client/client.go @@ -1,6 +1,7 @@ package client import ( + "math/rand" "net/url" "os" "time" @@ -10,6 +11,95 @@ import ( cm "krzyzanowski.dev/p2pchat/common" ) +type Context struct { + conn *websocket.Conn + // Assumption: size of 1 is enough, because first response read will be response for the last request + // no need to buffer + resFromServer chan cm.RFrame + reqFromServer chan cm.RFrame + rToServer chan cm.RFrame +} + +func NewClientContext(conn *websocket.Conn) *Context { + return &Context{ + conn: conn, + resFromServer: make(chan cm.RFrame), + reqFromServer: make(chan cm.RFrame), + rToServer: make(chan cm.RFrame), + } +} + +func (cliCtx *Context) serverHandler() error { + for { + reqFrame := <-cliCtx.reqFromServer + + logger.Debug("got request from server", "id", reqFrame.ID) + + if reqFrame.ID == cm.EchoReqID { + echoReq, err := cm.RequestFromFrame[cm.EchoRequest](reqFrame) + if err != nil { + return err + } + + resFrame, err := cm.ResponseFrameFrom(cm.EchoResponse(echoReq)) + if err != nil { + return err + } + + cliCtx.rToServer <- resFrame + } else { + logger.Fatal("can't handle it!") + } + } +} + +func (cliCtx *Context) serverWriter() error { + for { + logger.Debug("waiting for a frame to write") + frameToWrite := <-cliCtx.rToServer + err := cliCtx.conn.WriteJSON(frameToWrite) + if err != nil { + return err + } + logger.Debug("frame written", "id", frameToWrite.ID) + } +} + +func (cliCtx *Context) serverReader() error { + for { + logger.Debug("waiting for a frame to read") + var rFrame cm.RFrame + err := cliCtx.conn.ReadJSON(&rFrame) + if err != nil { + return err + } + + logger.Debug("frame read", "id", rFrame.ID) + + if rFrame.ID > 128 { + cliCtx.resFromServer <- rFrame + } else { + cliCtx.reqFromServer <- rFrame + } + + logger.Debug("frame pushed", "id", rFrame.ID) + } +} + +func (cliCtx *Context) sendRequest(req cm.Request) error { + rf, err := cm.RequestFrameFrom(req) + if err != nil { + return err + } + + cliCtx.rToServer <- rf + return nil +} + +func (cliCtx *Context) getResponseFrame() cm.RFrame { + return <-cliCtx.resFromServer +} + var logger = log.NewWithOptions(os.Stdout, log.Options{ ReportTimestamp: true, TimeFormat: time.TimeOnly, @@ -24,6 +114,43 @@ func init() { } } +func testAuth(ctx *Context) { + logger.Info("Trying to authenticate as krzmaciek...") + ctx.sendRequest(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) + logger.Debug("Request sent, waiting for response...") + arf := ctx.getResponseFrame() + ar, err := cm.ResponseFromFrame[cm.AuthResponse](arf) + if err != nil { + logger.Error(err) + } + logger.Infof("Authenticated?: %t", ar.IsSuccess) +} + +func testEcho(ctx *Context) { + echoByte := rand.Intn(32) + logger.Info("Testing echo...", "echoByte", echoByte) + ctx.sendRequest(cm.EchoRequest{EchoByte: byte(echoByte)}) + logger.Debug("Request sent, waiting for response...") + ereqf := ctx.getResponseFrame() + ereq, err := cm.ResponseFromFrame[cm.EchoResponse](ereqf) + if err != nil { + logger.Error(err) + } + logger.Info("Got response", "echoByte", ereq.EchoByte) +} + +func testListPeers(ctx *Context) { + logger.Info("Trying to get list of peers...") + ctx.sendRequest(cm.ListPeersRequest{}) + logger.Debug("Request sent, waiting for response...") + lpreqf := ctx.getResponseFrame() + lpreq, err := cm.ResponseFromFrame[cm.ListPeersResponse](lpreqf) + if err != nil { + logger.Error(err) + } + logger.Info("Got that list", "peersList", lpreq.PeersInfo) +} + func RunClient() { u := url.URL{Scheme: "ws", Host: ":8080", Path: "/wsapi"} c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) @@ -35,80 +162,14 @@ func RunClient() { defer c.Close() - logger.Info("authenticating...") - rf, _ := cm.RequestFrameFrom(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) - err = c.WriteJSON(rf) - if err != nil { - logger.Fatal(err) - } + ctx := NewClientContext(c) + go ctx.serverHandler() + go ctx.serverReader() + go ctx.serverWriter() - var authResFrame cm.ResponseFrame - err = c.ReadJSON(&authResFrame) - if err != nil { - logger.Fatal(err) - } - - authRes, err := cm.ResponseFromFrame[cm.AuthResponse](authResFrame) - if err != nil { - logger.Fatal(err) - } - - logger.Infof("authentication result: %t", authRes.IsSuccess) - time.Sleep(time.Second * 1) - - logger.Info("sending echo...") - echoByte := 123 - rf, err = cm.RequestFrameFrom(cm.EchoRequest{EchoByte: byte(echoByte)}) - if err != nil { - logger.Fatal(err) - } - - err = c.WriteJSON(rf) - if err != nil { - logger.Fatal(err) - } - - var echoResFrame cm.ResponseFrame - err = c.ReadJSON(&echoResFrame) - if err != nil { - logger.Fatal(err) - } - - echoRes, err := cm.ResponseFromFrame[cm.EchoResponse](echoResFrame) - if err != nil { - logger.Fatal(err) - } - - logger.Infof("sent echo of %d, got %d in return", echoByte, echoRes.EchoByte) - time.Sleep(time.Second) - - logger.Infof("i want list of peers...") - rf, err = cm.RequestFrameFrom(cm.ListPeersRequest{}) - if err != nil { - logger.Fatal(err) - } - - err = c.WriteJSON(rf) - if err != nil { - logger.Fatal(err) - } - - var listPeersResFrame cm.ResponseFrame - err = c.ReadJSON(&listPeersResFrame) - if err != nil { - logger.Fatal(err) - } - - listPeersRes, err := cm.ResponseFromFrame[cm.ListPeersResponse](listPeersResFrame) - if err != nil { - logger.Fatal(err) - } - - logger.Info("printing list of peers:") - - for _, p := range listPeersRes.PeersInfo { - logger.Infof("%+v", p) - } + testAuth(ctx) + testEcho(ctx) + testListPeers(ctx) time.Sleep(time.Second * 5) logger.Info("closing connection...") diff --git a/common/common.go b/common/common.go index 283787e..a07a564 100644 --- a/common/common.go +++ b/common/common.go @@ -7,9 +7,12 @@ import ( // Constants const ( - EchoRID = 1 - ListPeersRID = 2 - AuthRID = 3 + EchoReqID = 1 + EchoResID = 128 + EchoReqID + ListPeersReqID = 2 + ListPeersResID = 128 + ListPeersReqID + AuthReqID = 3 + AuthResID = 128 + AuthReqID ) // Requests & responses subtypes @@ -23,22 +26,22 @@ type PeerInfo struct { // Requests & responses: -type RequestFrame struct { +type RFrame struct { ID int `json:"id"` - Rest json.RawMessage `json:"request"` + Rest json.RawMessage `json:"r"` } -func RequestFrameFrom(req Request) (RequestFrame, error) { +func RequestFrameFrom(req Request) (RFrame, error) { jsonBytes, err := json.Marshal(req) if err != nil { - return *new(RequestFrame), err + return *new(RFrame), err } - return RequestFrame{req.RID(), jsonBytes}, nil + return RFrame{req.ID(), jsonBytes}, nil } -func RequestFromFrame[T Request](reqFrame RequestFrame) (T, error) { +func RequestFromFrame[T Request](reqFrame RFrame) (T, error) { var req T err := json.Unmarshal(reqFrame.Rest, &req) @@ -49,22 +52,17 @@ func RequestFromFrame[T Request](reqFrame RequestFrame) (T, error) { return req, nil } -type ResponseFrame struct { - ID int `json:"id"` - Rest json.RawMessage `json:"response"` -} - -func ResponseFrameFrom(res Response) (ResponseFrame, error) { +func ResponseFrameFrom(res Response) (RFrame, error) { jsonBytes, err := json.Marshal(res) if err != nil { - return *new(ResponseFrame), err + return *new(RFrame), err } - return ResponseFrame{res.RID(), jsonBytes}, nil + return RFrame{res.ID(), jsonBytes}, nil } -func ResponseFromFrame[T Response](resFrame ResponseFrame) (T, error) { +func ResponseFromFrame[T Response](resFrame RFrame) (T, error) { var res T err := json.Unmarshal(resFrame.Rest, &res) @@ -76,7 +74,7 @@ func ResponseFromFrame[T Response](resFrame ResponseFrame) (T, error) { } type Request interface { - RID() int + ID() int } type Response Request @@ -85,31 +83,31 @@ type EchoRequest struct { EchoByte byte `json:"echoByte"` } -func (EchoRequest) RID() int { - return EchoRID +func (EchoRequest) ID() int { + return EchoReqID } type EchoResponse struct { EchoByte byte `json:"echoByte"` } -func (EchoResponse) RID() int { - return EchoRID +func (EchoResponse) ID() int { + return EchoResID } type ListPeersRequest struct { } -func (ListPeersRequest) RID() int { - return ListPeersRID +func (ListPeersRequest) ID() int { + return ListPeersReqID } type ListPeersResponse struct { PeersInfo []PeerInfo `json:"peers"` } -func (ListPeersResponse) RID() int { - return ListPeersRID +func (ListPeersResponse) ID() int { + return ListPeersResID } type AuthRequest struct { @@ -117,14 +115,14 @@ type AuthRequest struct { Password string `json:"password"` } -func (AuthRequest) RID() int { - return AuthRID +func (AuthRequest) ID() int { + return AuthReqID } type AuthResponse struct { IsSuccess bool } -func (AuthResponse) RID() int { - return AuthRID +func (AuthResponse) ID() int { + return AuthResID } diff --git a/server/server.go b/server/server.go index 1067d26..14bd9fc 100644 --- a/server/server.go +++ b/server/server.go @@ -19,7 +19,7 @@ type Account struct { passHash []byte } -type ServerContext struct { +type Context struct { idCounter int idCounterLock sync.RWMutex peersList []*Peer @@ -30,7 +30,117 @@ type ServerContext struct { type HandlerContext struct { peer *Peer - *ServerContext + *Context + resFromClient chan common.RFrame + reqFromClient chan common.RFrame + rToClient chan common.RFrame +} + +func NewHandlerContext(peer *Peer, srvCtx *Context) *HandlerContext { + return &HandlerContext{ + peer, + srvCtx, + make(chan common.RFrame), + make(chan common.RFrame), + make(chan common.RFrame), + } +} + +func (hdlCtx *HandlerContext) clientHandler(hdlWg *sync.WaitGroup) error { + defer hdlWg.Done() + + for { + reqFrame := <-hdlCtx.reqFromClient + var res common.Response + var err error + + if reqFrame.ID == common.AuthReqID { + res, err = hdlCtx.handleAuth(&reqFrame) + } else if reqFrame.ID == common.ListPeersReqID { + res, err = hdlCtx.handleListPeers(&reqFrame) + } else if reqFrame.ID == common.EchoReqID { + res, err = hdlCtx.handleEcho(&reqFrame) + } + + if err != nil { + logger.Errorf("could not handle request ID=%d", reqFrame.ID) + return err + } + + resFrame, err := common.ResponseFrameFrom(res) + if err != nil { + logger.Errorf("could not create frame from response") + return err + } + + hdlCtx.rToClient <- resFrame + } +} + +func (hdlCtx *HandlerContext) clientWriter(hdlWg *sync.WaitGroup) error { + defer hdlWg.Done() + + for { + rFrame := <-hdlCtx.rToClient + resJsonBytes, err := json.Marshal(rFrame) + if err != nil { + logger.Errorf("error marshalling frame to json") + return err + } + + logger.Debugf("sending %s", string(resJsonBytes)) + err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes) + if err != nil { + logger.Errorf("error writing rframe") + return err + } + } +} + +func (hdlCtx *HandlerContext) clientReader(hdlWg *sync.WaitGroup) error { + defer hdlWg.Done() + + for { + messType, messBytes, err := hdlCtx.peer.conn.ReadMessage() + if err != nil { + return err + } + + if messType != 1 { + err := hdlCtx.peer.conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported")) + if err != nil { + logger.Debugf("[Server] error sending unsupported data close message") + return err + } + } + + logger.Debugf("got message text: %s", strings.Trim(string(messBytes), "\n")) + var rFrame common.RFrame + json.Unmarshal(messBytes, &rFrame) + logger.Debugf("unmarshalled request frame (ID=%d)", rFrame.ID) + + if rFrame.ID > 128 { + logger.Debug("it is response frame", "id", rFrame.ID) + hdlCtx.resFromClient <- rFrame + } else { + logger.Debug("it is request frame", "id", rFrame.ID) + hdlCtx.reqFromClient <- rFrame + } + } +} + +func (hdlCtx *HandlerContext) sendRequest(req common.Request) error { + rf, err := common.RequestFrameFrom(req) + if err != nil { + return err + } + + hdlCtx.rToClient <- rf + return nil +} + +func (hdlCtx *HandlerContext) getResponseFrame() common.RFrame { + return <-hdlCtx.resFromClient } type Peer struct { @@ -74,7 +184,7 @@ func peerSliceRemove(s *[]*Peer, i int) { *s = (*s)[:len(*s)-1] } -func (srvCtx *ServerContext) removePeer(peer *Peer) { +func (srvCtx *Context) removePeer(peer *Peer) { srvCtx.peersListLock.Lock() peerSliceRemove(&srvCtx.peersList, peerSliceIndexOf(srvCtx.peersList, peer.id)) srvCtx.peersListLock.Unlock() @@ -85,7 +195,7 @@ func handleDisconnection(handlerCtx *HandlerContext) { logger.Infof("%s disconnected", handlerCtx.peer.conn.RemoteAddr()) } -func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RequestFrame) (res common.Response, err error) { +func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RFrame) (res common.Response, err error) { echoReq, err := common.RequestFromFrame[common.EchoRequest](*reqFrame) if err != nil { logger.Error("could not read request from frame") @@ -96,7 +206,7 @@ func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RequestFrame) (res com return echoRes, nil } -func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RequestFrame) (res common.Response, err error) { +func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res common.Response, err error) { // Currently list peers request is empty, so we can ignore it - we won't use it _, err = common.RequestFromFrame[common.ListPeersRequest](*reqFrame) if err != nil { @@ -125,7 +235,7 @@ func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RequestFrame) (re return listPeersRes, nil } -func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RequestFrame) (res common.Response, err error) { +func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RFrame) (res common.Response, err error) { authReq, err := common.RequestFromFrame[common.AuthRequest](*reqFrame) if err != nil { logger.Error("could not read request from frame") @@ -170,7 +280,7 @@ func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RequestFrame) (res com return authRes, nil } -func (srvCtx *ServerContext) printConnectedPeers() { +func (srvCtx *Context) printConnectedPeers() { srvCtx.peersListLock.RLock() logger.Debug("displaying all connections:") @@ -181,56 +291,13 @@ func (srvCtx *ServerContext) printConnectedPeers() { nick = p.account.nickname } - log.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick) + logger.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick) } srvCtx.peersListLock.RUnlock() } -func (hdlCtx *HandlerContext) handleRequest(reqJsonBytes []byte) error { - logger.Debugf("got message text: %s", strings.Trim(string(reqJsonBytes), "\n")) - var reqFrame common.RequestFrame - json.Unmarshal(reqJsonBytes, &reqFrame) - log.Debugf("unmarshalled request frame (ID=%d)", reqFrame.ID) - var res common.Response - var err error - - if reqFrame.ID == common.AuthRID { - res, err = hdlCtx.handleAuth(&reqFrame) - } else if reqFrame.ID == common.ListPeersRID { - res, err = hdlCtx.handleListPeers(&reqFrame) - } else if reqFrame.ID == common.EchoRID { - res, err = hdlCtx.handleEcho(&reqFrame) - } - - if err != nil { - logger.Errorf("could not handle request ID=%d", reqFrame.ID) - return err - } - - resFrame, err := common.ResponseFrameFrom(res) - if err != nil { - logger.Errorf("could not create frame from response") - return err - } - - resJsonBytes, err := json.Marshal(resFrame) - if err != nil { - logger.Errorf("error marshalling frame to json") - return err - } - - logger.Debugf("sending %s", string(resJsonBytes)) - err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes) - if err != nil { - logger.Errorf("error writing response frame") - return err - } - - return nil -} - -func (srvCtx *ServerContext) addPeer(peer *Peer) { +func (srvCtx *Context) addPeer(peer *Peer) { srvCtx.idCounterLock.Lock() srvCtx.idCounter++ peer.id = srvCtx.idCounter @@ -240,7 +307,7 @@ func (srvCtx *ServerContext) addPeer(peer *Peer) { srvCtx.peersListLock.Unlock() } -func (srvCtx *ServerContext) wsapiHandler(w http.ResponseWriter, r *http.Request) { +func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{} conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -250,36 +317,34 @@ func (srvCtx *ServerContext) wsapiHandler(w http.ResponseWriter, r *http.Request peer := NewPeer(conn) srvCtx.addPeer(peer) - handlerCtx := &HandlerContext{peer, srvCtx} + handlerCtx := NewHandlerContext(peer, srvCtx) defer handleDisconnection(handlerCtx) defer conn.Close() logger.Infof("%s connected", conn.RemoteAddr()) - for { - messType, messBytes, err := conn.ReadMessage() - if err != nil { - break - } + var handlerWg sync.WaitGroup + handlerWg.Add(3) + go handlerCtx.clientWriter(&handlerWg) + go handlerCtx.clientHandler(&handlerWg) + go handlerCtx.clientReader(&handlerWg) - if messType != 1 { - err := conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported")) - if err != nil { - logger.Debugf("[Server] error sending close message due to unsupported data") - } - - return - } - - err = handlerCtx.handleRequest(messBytes) - if err != nil { - logger.Debug(err) - break - } + logger.Debug("sending echo request...") + handlerCtx.sendRequest(common.EchoRequest{EchoByte: 123}) + logger.Debug("sent") + echoResF := handlerCtx.getResponseFrame() + logger.Debug("got response") + echoRes, err := common.ResponseFromFrame[common.EchoResponse](echoResF) + if err != nil { + logger.Error(err) + return } + logger.Debug("test echo done", "byteSent", 123, "byteReceived", echoRes.EchoByte) + + handlerWg.Wait() } func RunServer() { - srvCtx := &ServerContext{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)} + srvCtx := &Context{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)} go func() { for {