diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f11b75 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ diff --git a/client/client.go b/client/client.go index a9d778e..5cd3818 100644 --- a/client/client.go +++ b/client/client.go @@ -1,6 +1,7 @@ package client import ( + "math/rand" "net/url" "os" "time" @@ -10,6 +11,95 @@ import ( cm "krzyzanowski.dev/p2pchat/common" ) +type Context struct { + conn *websocket.Conn + // Assumption: size of 1 is enough, because first response read will be response for the last request + // no need to buffer + resFromServer chan cm.RFrame + reqFromServer chan cm.RFrame + rToServer chan cm.RFrame +} + +func NewClientContext(conn *websocket.Conn) *Context { + return &Context{ + conn: conn, + resFromServer: make(chan cm.RFrame), + reqFromServer: make(chan cm.RFrame), + rToServer: make(chan cm.RFrame), + } +} + +func (cliCtx *Context) serverHandler() error { + for { + reqFrame := <-cliCtx.reqFromServer + + logger.Debug("got request from server", "id", reqFrame.ID) + + if reqFrame.ID == cm.EchoReqID { + echoReq, err := cm.RequestFromFrame[cm.EchoRequest](reqFrame) + if err != nil { + return err + } + + resFrame, err := cm.ResponseFrameFrom(cm.EchoResponse(echoReq)) + if err != nil { + return err + } + + cliCtx.rToServer <- resFrame + } else { + logger.Fatal("can't handle it!") + } + } +} + +func (cliCtx *Context) serverWriter() error { + for { + logger.Debug("waiting for a frame to write") + frameToWrite := <-cliCtx.rToServer + err := cliCtx.conn.WriteJSON(frameToWrite) + if err != nil { + return err + } + logger.Debug("frame written", "id", frameToWrite.ID) + } +} + +func (cliCtx *Context) serverReader() error { + for { + logger.Debug("waiting for a frame to read") + var rFrame cm.RFrame + err := cliCtx.conn.ReadJSON(&rFrame) + if err != nil { + return err + } + + logger.Debug("frame read", "id", rFrame.ID) + + if rFrame.ID > 128 { + cliCtx.resFromServer <- rFrame + } else { + cliCtx.reqFromServer <- rFrame + } + + logger.Debug("frame pushed", "id", rFrame.ID) + } +} + +func (cliCtx *Context) sendRequest(req cm.Request) error { + rf, err := cm.RequestFrameFrom(req) + if err != nil { + return err + } + + cliCtx.rToServer <- rf + return nil +} + +func (cliCtx *Context) getResponseFrame() cm.RFrame { + return <-cliCtx.resFromServer +} + var logger = log.NewWithOptions(os.Stdout, log.Options{ ReportTimestamp: true, TimeFormat: time.TimeOnly, @@ -24,6 +114,43 @@ func init() { } } +func testAuth(ctx *Context) { + logger.Info("Trying to authenticate as krzmaciek...") + ctx.sendRequest(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) + logger.Debug("Request sent, waiting for response...") + arf := ctx.getResponseFrame() + ar, err := cm.ResponseFromFrame[cm.AuthResponse](arf) + if err != nil { + logger.Error(err) + } + logger.Infof("Authenticated?: %t", ar.IsSuccess) +} + +func testEcho(ctx *Context) { + echoByte := rand.Intn(32) + logger.Info("Testing echo...", "echoByte", echoByte) + ctx.sendRequest(cm.EchoRequest{EchoByte: byte(echoByte)}) + logger.Debug("Request sent, waiting for response...") + ereqf := ctx.getResponseFrame() + ereq, err := cm.ResponseFromFrame[cm.EchoResponse](ereqf) + if err != nil { + logger.Error(err) + } + logger.Info("Got response", "echoByte", ereq.EchoByte) +} + +func testListPeers(ctx *Context) { + logger.Info("Trying to get list of peers...") + ctx.sendRequest(cm.ListPeersRequest{}) + logger.Debug("Request sent, waiting for response...") + lpreqf := ctx.getResponseFrame() + lpreq, err := cm.ResponseFromFrame[cm.ListPeersResponse](lpreqf) + if err != nil { + logger.Error(err) + } + logger.Info("Got that list", "peersList", lpreq.PeersInfo) +} + func RunClient() { u := url.URL{Scheme: "ws", Host: ":8080", Path: "/wsapi"} c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) @@ -35,80 +162,14 @@ func RunClient() { defer c.Close() - logger.Info("authenticating...") - rf, _ := cm.RequestFrameFrom(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) - err = c.WriteJSON(rf) - if err != nil { - logger.Fatal(err) - } + ctx := NewClientContext(c) + go ctx.serverHandler() + go ctx.serverReader() + go ctx.serverWriter() - var authResFrame cm.ResponseFrame - err = c.ReadJSON(&authResFrame) - if err != nil { - logger.Fatal(err) - } - - authRes, err := cm.ResponseFromFrame[cm.AuthResponse](authResFrame) - if err != nil { - logger.Fatal(err) - } - - logger.Infof("authentication result: %t", authRes.IsSuccess) - time.Sleep(time.Second * 1) - - logger.Info("sending echo...") - echoByte := 123 - rf, err = cm.RequestFrameFrom(cm.EchoRequest{EchoByte: byte(echoByte)}) - if err != nil { - logger.Fatal(err) - } - - err = c.WriteJSON(rf) - if err != nil { - logger.Fatal(err) - } - - var echoResFrame cm.ResponseFrame - err = c.ReadJSON(&echoResFrame) - if err != nil { - logger.Fatal(err) - } - - echoRes, err := cm.ResponseFromFrame[cm.EchoResponse](echoResFrame) - if err != nil { - logger.Fatal(err) - } - - logger.Infof("sent echo of %d, got %d in return", echoByte, echoRes.EchoByte) - time.Sleep(time.Second) - - logger.Infof("i want list of peers...") - rf, err = cm.RequestFrameFrom(cm.ListPeersRequest{}) - if err != nil { - logger.Fatal(err) - } - - err = c.WriteJSON(rf) - if err != nil { - logger.Fatal(err) - } - - var listPeersResFrame cm.ResponseFrame - err = c.ReadJSON(&listPeersResFrame) - if err != nil { - logger.Fatal(err) - } - - listPeersRes, err := cm.ResponseFromFrame[cm.ListPeersResponse](listPeersResFrame) - if err != nil { - logger.Fatal(err) - } - - logger.Info("printing list of peers:") - - for _, p := range listPeersRes.PeersInfo { - logger.Infof("%+v", p) - } + testAuth(ctx) + testEcho(ctx) + testListPeers(ctx) time.Sleep(time.Second * 5) logger.Info("closing connection...") diff --git a/common/common.go b/common/common.go index 283787e..a07a564 100644 --- a/common/common.go +++ b/common/common.go @@ -7,9 +7,12 @@ import ( // Constants const ( - EchoRID = 1 - ListPeersRID = 2 - AuthRID = 3 + EchoReqID = 1 + EchoResID = 128 + EchoReqID + ListPeersReqID = 2 + ListPeersResID = 128 + ListPeersReqID + AuthReqID = 3 + AuthResID = 128 + AuthReqID ) // Requests & responses subtypes @@ -23,22 +26,22 @@ type PeerInfo struct { // Requests & responses: -type RequestFrame struct { +type RFrame struct { ID int `json:"id"` - Rest json.RawMessage `json:"request"` + Rest json.RawMessage `json:"r"` } -func RequestFrameFrom(req Request) (RequestFrame, error) { +func RequestFrameFrom(req Request) (RFrame, error) { jsonBytes, err := json.Marshal(req) if err != nil { - return *new(RequestFrame), err + return *new(RFrame), err } - return RequestFrame{req.RID(), jsonBytes}, nil + return RFrame{req.ID(), jsonBytes}, nil } -func RequestFromFrame[T Request](reqFrame RequestFrame) (T, error) { +func RequestFromFrame[T Request](reqFrame RFrame) (T, error) { var req T err := json.Unmarshal(reqFrame.Rest, &req) @@ -49,22 +52,17 @@ func RequestFromFrame[T Request](reqFrame RequestFrame) (T, error) { return req, nil } -type ResponseFrame struct { - ID int `json:"id"` - Rest json.RawMessage `json:"response"` -} - -func ResponseFrameFrom(res Response) (ResponseFrame, error) { +func ResponseFrameFrom(res Response) (RFrame, error) { jsonBytes, err := json.Marshal(res) if err != nil { - return *new(ResponseFrame), err + return *new(RFrame), err } - return ResponseFrame{res.RID(), jsonBytes}, nil + return RFrame{res.ID(), jsonBytes}, nil } -func ResponseFromFrame[T Response](resFrame ResponseFrame) (T, error) { +func ResponseFromFrame[T Response](resFrame RFrame) (T, error) { var res T err := json.Unmarshal(resFrame.Rest, &res) @@ -76,7 +74,7 @@ func ResponseFromFrame[T Response](resFrame ResponseFrame) (T, error) { } type Request interface { - RID() int + ID() int } type Response Request @@ -85,31 +83,31 @@ type EchoRequest struct { EchoByte byte `json:"echoByte"` } -func (EchoRequest) RID() int { - return EchoRID +func (EchoRequest) ID() int { + return EchoReqID } type EchoResponse struct { EchoByte byte `json:"echoByte"` } -func (EchoResponse) RID() int { - return EchoRID +func (EchoResponse) ID() int { + return EchoResID } type ListPeersRequest struct { } -func (ListPeersRequest) RID() int { - return ListPeersRID +func (ListPeersRequest) ID() int { + return ListPeersReqID } type ListPeersResponse struct { PeersInfo []PeerInfo `json:"peers"` } -func (ListPeersResponse) RID() int { - return ListPeersRID +func (ListPeersResponse) ID() int { + return ListPeersResID } type AuthRequest struct { @@ -117,14 +115,14 @@ type AuthRequest struct { Password string `json:"password"` } -func (AuthRequest) RID() int { - return AuthRID +func (AuthRequest) ID() int { + return AuthReqID } type AuthResponse struct { IsSuccess bool } -func (AuthResponse) RID() int { - return AuthRID +func (AuthResponse) ID() int { + return AuthResID } diff --git a/server/server.go b/server/server.go index 1067d26..14bd9fc 100644 --- a/server/server.go +++ b/server/server.go @@ -19,7 +19,7 @@ type Account struct { passHash []byte } -type ServerContext struct { +type Context struct { idCounter int idCounterLock sync.RWMutex peersList []*Peer @@ -30,7 +30,117 @@ type ServerContext struct { type HandlerContext struct { peer *Peer - *ServerContext + *Context + resFromClient chan common.RFrame + reqFromClient chan common.RFrame + rToClient chan common.RFrame +} + +func NewHandlerContext(peer *Peer, srvCtx *Context) *HandlerContext { + return &HandlerContext{ + peer, + srvCtx, + make(chan common.RFrame), + make(chan common.RFrame), + make(chan common.RFrame), + } +} + +func (hdlCtx *HandlerContext) clientHandler(hdlWg *sync.WaitGroup) error { + defer hdlWg.Done() + + for { + reqFrame := <-hdlCtx.reqFromClient + var res common.Response + var err error + + if reqFrame.ID == common.AuthReqID { + res, err = hdlCtx.handleAuth(&reqFrame) + } else if reqFrame.ID == common.ListPeersReqID { + res, err = hdlCtx.handleListPeers(&reqFrame) + } else if reqFrame.ID == common.EchoReqID { + res, err = hdlCtx.handleEcho(&reqFrame) + } + + if err != nil { + logger.Errorf("could not handle request ID=%d", reqFrame.ID) + return err + } + + resFrame, err := common.ResponseFrameFrom(res) + if err != nil { + logger.Errorf("could not create frame from response") + return err + } + + hdlCtx.rToClient <- resFrame + } +} + +func (hdlCtx *HandlerContext) clientWriter(hdlWg *sync.WaitGroup) error { + defer hdlWg.Done() + + for { + rFrame := <-hdlCtx.rToClient + resJsonBytes, err := json.Marshal(rFrame) + if err != nil { + logger.Errorf("error marshalling frame to json") + return err + } + + logger.Debugf("sending %s", string(resJsonBytes)) + err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes) + if err != nil { + logger.Errorf("error writing rframe") + return err + } + } +} + +func (hdlCtx *HandlerContext) clientReader(hdlWg *sync.WaitGroup) error { + defer hdlWg.Done() + + for { + messType, messBytes, err := hdlCtx.peer.conn.ReadMessage() + if err != nil { + return err + } + + if messType != 1 { + err := hdlCtx.peer.conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported")) + if err != nil { + logger.Debugf("[Server] error sending unsupported data close message") + return err + } + } + + logger.Debugf("got message text: %s", strings.Trim(string(messBytes), "\n")) + var rFrame common.RFrame + json.Unmarshal(messBytes, &rFrame) + logger.Debugf("unmarshalled request frame (ID=%d)", rFrame.ID) + + if rFrame.ID > 128 { + logger.Debug("it is response frame", "id", rFrame.ID) + hdlCtx.resFromClient <- rFrame + } else { + logger.Debug("it is request frame", "id", rFrame.ID) + hdlCtx.reqFromClient <- rFrame + } + } +} + +func (hdlCtx *HandlerContext) sendRequest(req common.Request) error { + rf, err := common.RequestFrameFrom(req) + if err != nil { + return err + } + + hdlCtx.rToClient <- rf + return nil +} + +func (hdlCtx *HandlerContext) getResponseFrame() common.RFrame { + return <-hdlCtx.resFromClient } type Peer struct { @@ -74,7 +184,7 @@ func peerSliceRemove(s *[]*Peer, i int) { *s = (*s)[:len(*s)-1] } -func (srvCtx *ServerContext) removePeer(peer *Peer) { +func (srvCtx *Context) removePeer(peer *Peer) { srvCtx.peersListLock.Lock() peerSliceRemove(&srvCtx.peersList, peerSliceIndexOf(srvCtx.peersList, peer.id)) srvCtx.peersListLock.Unlock() @@ -85,7 +195,7 @@ func handleDisconnection(handlerCtx *HandlerContext) { logger.Infof("%s disconnected", handlerCtx.peer.conn.RemoteAddr()) } -func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RequestFrame) (res common.Response, err error) { +func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RFrame) (res common.Response, err error) { echoReq, err := common.RequestFromFrame[common.EchoRequest](*reqFrame) if err != nil { logger.Error("could not read request from frame") @@ -96,7 +206,7 @@ func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RequestFrame) (res com return echoRes, nil } -func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RequestFrame) (res common.Response, err error) { +func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res common.Response, err error) { // Currently list peers request is empty, so we can ignore it - we won't use it _, err = common.RequestFromFrame[common.ListPeersRequest](*reqFrame) if err != nil { @@ -125,7 +235,7 @@ func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RequestFrame) (re return listPeersRes, nil } -func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RequestFrame) (res common.Response, err error) { +func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RFrame) (res common.Response, err error) { authReq, err := common.RequestFromFrame[common.AuthRequest](*reqFrame) if err != nil { logger.Error("could not read request from frame") @@ -170,7 +280,7 @@ func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RequestFrame) (res com return authRes, nil } -func (srvCtx *ServerContext) printConnectedPeers() { +func (srvCtx *Context) printConnectedPeers() { srvCtx.peersListLock.RLock() logger.Debug("displaying all connections:") @@ -181,56 +291,13 @@ func (srvCtx *ServerContext) printConnectedPeers() { nick = p.account.nickname } - log.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick) + logger.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick) } srvCtx.peersListLock.RUnlock() } -func (hdlCtx *HandlerContext) handleRequest(reqJsonBytes []byte) error { - logger.Debugf("got message text: %s", strings.Trim(string(reqJsonBytes), "\n")) - var reqFrame common.RequestFrame - json.Unmarshal(reqJsonBytes, &reqFrame) - log.Debugf("unmarshalled request frame (ID=%d)", reqFrame.ID) - var res common.Response - var err error - - if reqFrame.ID == common.AuthRID { - res, err = hdlCtx.handleAuth(&reqFrame) - } else if reqFrame.ID == common.ListPeersRID { - res, err = hdlCtx.handleListPeers(&reqFrame) - } else if reqFrame.ID == common.EchoRID { - res, err = hdlCtx.handleEcho(&reqFrame) - } - - if err != nil { - logger.Errorf("could not handle request ID=%d", reqFrame.ID) - return err - } - - resFrame, err := common.ResponseFrameFrom(res) - if err != nil { - logger.Errorf("could not create frame from response") - return err - } - - resJsonBytes, err := json.Marshal(resFrame) - if err != nil { - logger.Errorf("error marshalling frame to json") - return err - } - - logger.Debugf("sending %s", string(resJsonBytes)) - err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes) - if err != nil { - logger.Errorf("error writing response frame") - return err - } - - return nil -} - -func (srvCtx *ServerContext) addPeer(peer *Peer) { +func (srvCtx *Context) addPeer(peer *Peer) { srvCtx.idCounterLock.Lock() srvCtx.idCounter++ peer.id = srvCtx.idCounter @@ -240,7 +307,7 @@ func (srvCtx *ServerContext) addPeer(peer *Peer) { srvCtx.peersListLock.Unlock() } -func (srvCtx *ServerContext) wsapiHandler(w http.ResponseWriter, r *http.Request) { +func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{} conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -250,36 +317,34 @@ func (srvCtx *ServerContext) wsapiHandler(w http.ResponseWriter, r *http.Request peer := NewPeer(conn) srvCtx.addPeer(peer) - handlerCtx := &HandlerContext{peer, srvCtx} + handlerCtx := NewHandlerContext(peer, srvCtx) defer handleDisconnection(handlerCtx) defer conn.Close() logger.Infof("%s connected", conn.RemoteAddr()) - for { - messType, messBytes, err := conn.ReadMessage() - if err != nil { - break - } + var handlerWg sync.WaitGroup + handlerWg.Add(3) + go handlerCtx.clientWriter(&handlerWg) + go handlerCtx.clientHandler(&handlerWg) + go handlerCtx.clientReader(&handlerWg) - if messType != 1 { - err := conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported")) - if err != nil { - logger.Debugf("[Server] error sending close message due to unsupported data") - } - - return - } - - err = handlerCtx.handleRequest(messBytes) - if err != nil { - logger.Debug(err) - break - } + logger.Debug("sending echo request...") + handlerCtx.sendRequest(common.EchoRequest{EchoByte: 123}) + logger.Debug("sent") + echoResF := handlerCtx.getResponseFrame() + logger.Debug("got response") + echoRes, err := common.ResponseFromFrame[common.EchoResponse](echoResF) + if err != nil { + logger.Error(err) + return } + logger.Debug("test echo done", "byteSent", 123, "byteReceived", echoRes.EchoByte) + + handlerWg.Wait() } func RunServer() { - srvCtx := &ServerContext{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)} + srvCtx := &Context{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)} go func() { for {