From 4a38b6e28a333db552d6cda1f14e4664e6b28a55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Krzy=C5=BCanowski?= Date: Mon, 29 Apr 2024 02:02:51 +0200 Subject: [PATCH] Working on initating connections --- client/client.go | 192 ++++++++++++++------ common/common.go | 10 +- go.mod | 2 +- main.go | 4 +- server/server.go | 457 ++++++++++++++++++++++++++++++++++------------- 5 files changed, 480 insertions(+), 185 deletions(-) diff --git a/client/client.go b/client/client.go index 00230d1..fa1f934 100644 --- a/client/client.go +++ b/client/client.go @@ -1,15 +1,19 @@ package client import ( + "bufio" + "context" + "errors" "golang.org/x/sync/errgroup" - "math/rand" "net/url" "os" + "strconv" + "strings" "time" "github.com/charmbracelet/log" "github.com/gorilla/websocket" - cm "krzyzanowski.dev/p2pchat/common" + cm "krzyzanowski.dev/archat/common" ) type Context struct { @@ -30,43 +34,56 @@ func NewClientContext(conn *websocket.Conn) *Context { } } -func (cliCtx *Context) serverHandler() error { +func (cliCtx *Context) serverHandler(syncCtx context.Context) error { + defer logger.Debug("server handler last line...") + for { - reqFrame := <-cliCtx.reqFromServer + select { + case <-syncCtx.Done(): + return nil + case reqFrame := <-cliCtx.reqFromServer: + logger.Debug("got request from server", "id", reqFrame.ID) - logger.Debug("got request from server", "id", reqFrame.ID) + if reqFrame.ID == cm.EchoReqID { + echoReq, err := cm.RequestFromFrame[cm.EchoRequest](reqFrame) + if err != nil { + return err + } - if reqFrame.ID == cm.EchoReqID { - echoReq, err := cm.RequestFromFrame[cm.EchoRequest](reqFrame) - if err != nil { - return err + resFrame, err := cm.ResponseFrameFrom(cm.EchoResponse(echoReq)) + if err != nil { + return err + } + + cliCtx.rToServer <- resFrame + } else { + logger.Warn("can't handle it!") } - - resFrame, err := cm.ResponseFrameFrom(cm.EchoResponse(echoReq)) - if err != nil { - return err - } - - cliCtx.rToServer <- resFrame - } else { - logger.Fatal("can't handle it!") } } } -func (cliCtx *Context) serverWriter() error { +func (cliCtx *Context) serverWriter(syncCtx context.Context) error { + defer logger.Debug("server writer last line...") + for { logger.Debug("waiting for a frame to write") - frameToWrite := <-cliCtx.rToServer - err := cliCtx.conn.WriteJSON(frameToWrite) - if err != nil { - return err + select { + case <-syncCtx.Done(): + return nil + case frameToWrite := <-cliCtx.rToServer: + err := cliCtx.conn.WriteJSON(frameToWrite) + if err != nil { + return err + } + logger.Debug("frame written", "id", frameToWrite.ID) } - logger.Debug("frame written", "id", frameToWrite.ID) } } -func (cliCtx *Context) serverReader() error { +func (cliCtx *Context) serverReader(syncCtx context.Context) error { + defer logger.Debug("server reader last line...") + for { logger.Debug("waiting for a frame to read") var rFrame cm.RFrame @@ -115,9 +132,9 @@ func init() { } } -func testAuth(ctx *Context) { +func sendAuth(ctx *Context, nick, pass string) { logger.Info("Trying to authenticate as krzmaciek...") - err := ctx.sendRequest(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) + err := ctx.sendRequest(cm.AuthRequest{Nickname: nick, Password: pass}) if err != nil { logger.Error(err) @@ -136,10 +153,9 @@ func testAuth(ctx *Context) { logger.Infof("Authenticated?: %t", ar.IsSuccess) } -func testEcho(ctx *Context) { - echoByte := rand.Intn(32) +func sendEcho(ctx *Context, echoByte byte) { logger.Info("Testing echo...", "echoByte", echoByte) - err := ctx.sendRequest(cm.EchoRequest{EchoByte: byte(echoByte)}) + err := ctx.sendRequest(cm.EchoRequest{EchoByte: echoByte}) if err != nil { logger.Error(err) @@ -158,7 +174,7 @@ func testEcho(ctx *Context) { logger.Info("Got response", "echoByte", ereq.EchoByte) } -func testListPeers(ctx *Context) { +func sendListPeers(ctx *Context) { logger.Info("Trying to get list of peers...") err := ctx.sendRequest(cm.ListPeersRequest{}) @@ -179,6 +195,18 @@ func testListPeers(ctx *Context) { logger.Info("Got that list", "peersList", lpreq.PeersInfo) } +func sendStartChatA(ctx *Context, nick string) { + logger.Info("Doing chat start A...") + err := ctx.sendRequest(cm.StartChatARequest{Nickname: nick}) + + if err != nil { + logger.Error(err) + return + } + + logger.Debug("Request sent, no wait for response") +} + func RunClient() { u := url.URL{Scheme: "ws", Host: ":8080", Path: "/wsapi"} c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) @@ -188,32 +216,94 @@ func RunClient() { return } - defer func(c *websocket.Conn) { - err := c.Close() - if err != nil { - logger.Error(err) + cliCtx := NewClientContext(c) + errGroup, syncCtx := errgroup.WithContext(context.Background()) + + errGroup.Go(func() error { + return cliCtx.serverHandler(syncCtx) + }) + + errGroup.Go(func() error { + return cliCtx.serverReader(syncCtx) + }) + + errGroup.Go(func() error { + return cliCtx.serverWriter(syncCtx) + }) + + errGroup.Go(func() error { + <-syncCtx.Done() + logger.Info("closing client...") + time.Sleep(time.Second * 3) + close(cliCtx.rToServer) + close(cliCtx.resFromServer) + close(cliCtx.reqFromServer) + _ = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + return c.Close() + }) + + closer := make(chan int) + errGroup.Go(func() error { + select { + case <-closer: + return errors.New("close") + case <-syncCtx.Done(): + return nil } - }(c) + }) - ctx := NewClientContext(c) - errGroup := new(errgroup.Group) - errGroup.Go(ctx.serverHandler) - errGroup.Go(ctx.serverReader) - errGroup.Go(ctx.serverWriter) + go func() { + scanner := bufio.NewScanner(os.Stdin) + + for scanner.Scan() { + cmd := strings.TrimRight(scanner.Text(), " \n\t") + cmdElements := strings.Split(cmd, " ") + cmdName := cmdElements[0] + cmdArgs := cmdElements[1:] + + if cmdName == "exit" { + logger.Info("closing...") + closer <- 1 + } else if cmdName == "echo" { + if len(cmdArgs) != 1 { + logger.Errorf("echo command requires 1 argument, but %d was provided", len(cmdArgs)) + continue + } + + num, err := strconv.Atoi(cmdArgs[0]) + + if err != nil { + logger.Errorf("%s is not a number", cmdArgs[0]) + continue + } + + sendEcho(cliCtx, byte(num)) + } else if cmdName == "list" { + sendListPeers(cliCtx) + } else if cmdName == "auth" { + if len(cmdArgs) != 2 { + logger.Errorf("auth command requires 2 argument, but %d was provided", len(cmdArgs)) + continue + } + + nick := cmdArgs[0] + pass := cmdArgs[1] + + sendAuth(cliCtx, nick, pass) + } else if cmdName == "startchata" { + if len(cmdArgs) != 1 { + logger.Errorf("startchata command requires 1 argument, but %d was provided", len(cmdArgs)) + continue + } + + sendStartChatA(cliCtx, cmdArgs[0]) + } + } + }() - testAuth(ctx) - testEcho(ctx) - testListPeers(ctx) err = errGroup.Wait() if err != nil { logger.Error(err) } - - logger.Info("closing connection...") - err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - - if err != nil { - logger.Error(err) - } } diff --git a/common/common.go b/common/common.go index c1a7669..1fee048 100644 --- a/common/common.go +++ b/common/common.go @@ -43,6 +43,10 @@ func (rf RFrame) IsResponse() bool { return rf.ID > 128 } +func (rf RFrame) IsError() bool { + return rf.ID > 256 +} + func RequestFrameFrom(req Request) (RFrame, error) { jsonBytes, err := json.Marshal(req) @@ -139,8 +143,10 @@ func (AuthResponse) ID() int { return AuthResID } -// "Stateful" requests like these need to have some information identifying what operation they are linked to -// There may be some errors if two requests are sent by one host to the same other host... may they? +// "Stateful" requests like these need to have some information identifying +// what operation they are linked to +// There may be some errors if two requests are sent by one host to the same +// other host... may they? type StartChatARequest struct { Nickname string `json:"nickname"` diff --git a/go.mod b/go.mod index 2d883cd..3f2cd75 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module krzyzanowski.dev/p2pchat +module krzyzanowski.dev/archat go 1.21.7 diff --git a/main.go b/main.go index a627668..3ef9301 100644 --- a/main.go +++ b/main.go @@ -4,8 +4,8 @@ import ( "log" "os" - "krzyzanowski.dev/p2pchat/client" - "krzyzanowski.dev/p2pchat/server" + "krzyzanowski.dev/archat/client" + "krzyzanowski.dev/archat/server" ) func main() { diff --git a/server/server.go b/server/server.go index 7ef7b98..5b0aeea 100644 --- a/server/server.go +++ b/server/server.go @@ -1,32 +1,101 @@ package server import ( + "context" "encoding/json" - "golang.org/x/sync/errgroup" + "errors" "net/http" "os" + "slices" "strings" "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/charmbracelet/log" "github.com/gorilla/websocket" "golang.org/x/crypto/bcrypt" - "krzyzanowski.dev/p2pchat/common" + "krzyzanowski.dev/archat/common" ) +type Peer struct { + id int + conn *websocket.Conn + hasAccount bool + account *Account +} + +func NewPeer(conn *websocket.Conn) *Peer { + return &Peer{-1, conn, false, nil} +} + type Account struct { nickname string passHash []byte } +const ( + InitiationStageA = 1 + InitiationStageB = 2 + InitiationStageC = 3 + InitiationStageD = 4 +) + +type Initiation struct { + abANick string + abBNick string + stage int +} + +func NewInitiation(abA string, abB string) *Initiation { + return &Initiation{abA, abB, InitiationStageA} +} + type Context struct { - idCounter int - idCounterLock sync.RWMutex - peersList []*Peer - peersListLock sync.RWMutex - accounts map[string]*Account - accountsLock sync.RWMutex + idCounter int + idCounterLock sync.RWMutex + peersList []*Peer + peersListLock sync.RWMutex + accounts map[string]*Account + accountsLock sync.RWMutex + initiations []*Initiation + initiationsLock sync.RWMutex + handlerContexts []*HandlerContext + handlerContextsLock sync.RWMutex +} + +func NewContext() *Context { + return &Context{ + peersList: make([]*Peer, 0), + accounts: make(map[string]*Account), + initiations: make([]*Initiation, 0), + } +} + +// Remember to lock before calling +func (ctx *Context) getPeerByNick(nick string) (*Peer, error) { + for _, peer := range ctx.peersList { + if peer.hasAccount && peer.account.nickname == nick { + return peer, nil + } + } + return nil, errors.New("peer not found") +} + +// Remember to lock before calling +func (ctx *Context) getCtxByNick(nick string) (*HandlerContext, error) { + idx := slices.IndexFunc[[]*HandlerContext, *HandlerContext]( + ctx.handlerContexts, + func(handlerContext *HandlerContext) bool { + return handlerContext.peer.hasAccount && handlerContext.peer.account.nickname == nick + }) + + if idx != -1 { + return ctx.handlerContexts[idx], nil + } + + return nil, errors.New("not found") } type HandlerContext struct { @@ -47,88 +116,109 @@ func NewHandlerContext(peer *Peer, srvCtx *Context) *HandlerContext { } } -func (hdlCtx *HandlerContext) clientHandler() error { +func (hdlCtx *HandlerContext) clientHandler(syncCtx context.Context) error { +handleNext: for { - reqFrame := <-hdlCtx.reqFromClient - var res common.Response - var err error + select { + case <-syncCtx.Done(): + return nil + case reqFrame := <-hdlCtx.reqFromClient: + var res common.Response + var err error - if reqFrame.ID == common.AuthReqID { - res, err = hdlCtx.handleAuth(&reqFrame) - } else if reqFrame.ID == common.ListPeersReqID { - res, err = hdlCtx.handleListPeers(&reqFrame) - } else if reqFrame.ID == common.EchoReqID { - res, err = hdlCtx.handleEcho(&reqFrame) - } + if reqFrame.ID == common.AuthReqID { + res, err = hdlCtx.handleAuth(&reqFrame) + } else if reqFrame.ID == common.ListPeersReqID { + res, err = hdlCtx.handleListPeers(&reqFrame) + } else if reqFrame.ID == common.EchoReqID { + res, err = hdlCtx.handleEcho(&reqFrame) + } else if reqFrame.ID == common.StartChatAReqID { + res, err = hdlCtx.handleChatStartA(&reqFrame) + } - if err != nil { - logger.Errorf("could not handle request ID=%d", reqFrame.ID) - return err - } - - resFrame, err := common.ResponseFrameFrom(res) - - if err != nil { - logger.Errorf("could not create frame from response") - return err - } - - hdlCtx.rToClient <- resFrame - } -} - -func (hdlCtx *HandlerContext) clientWriter() error { - for { - rFrame := <-hdlCtx.rToClient - resJsonBytes, err := json.Marshal(rFrame) - - if err != nil { - logger.Errorf("error marshalling frame to json") - return err - } - - logger.Debugf("sending %s", string(resJsonBytes)) - err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes) - - if err != nil { - logger.Errorf("error writing rframe") - return err - } - } -} - -func (hdlCtx *HandlerContext) clientReader() error { - for { - messType, messBytes, err := hdlCtx.peer.conn.ReadMessage() - - if err != nil { - return err - } - - if messType != 1 { - err := hdlCtx.peer.conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported")) if err != nil { - logger.Debugf("[Server] error sending unsupported data close message") + logger.Errorf("could not handle request ID=%d", reqFrame.ID) + return err + } + + if res == nil { + logger.Debugf("request without response ID=%d", reqFrame.ID) + continue handleNext + } + + resFrame, err := common.ResponseFrameFrom(res) + + if err != nil { + logger.Errorf("could not create frame from response") + return err + } + + hdlCtx.rToClient <- resFrame + } + } +} + +func (hdlCtx *HandlerContext) clientWriter(syncCtx context.Context) error { + for { + select { + case <-syncCtx.Done(): + return nil + case rFrame := <-hdlCtx.rToClient: + resJsonBytes, err := json.Marshal(rFrame) + + if err != nil { + logger.Errorf("error marshalling frame to json") + return err + } + + logger.Debugf("sending %s", string(resJsonBytes)) + err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes) + + if err != nil { + logger.Errorf("error writing rframe") return err } } + } +} - logger.Debugf("got message text: %s", strings.Trim(string(messBytes), "\n")) - var rFrame common.RFrame - err = json.Unmarshal(messBytes, &rFrame) +func (hdlCtx *HandlerContext) clientReader(syncCtx context.Context) error { + for { + select { + case <-syncCtx.Done(): + return nil + default: + messType, messBytes, err := hdlCtx.peer.conn.ReadMessage() - if err != nil { - return err - } + if err != nil { + return err + } - logger.Debugf("unmarshalled request frame (ID=%d)", rFrame.ID) + if messType != 1 { + err := hdlCtx.peer.conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported")) + if err != nil { + logger.Debugf("[Server] error sending unsupported data close message") + return err + } + } - if rFrame.IsResponse() { - logger.Debug("it is response frame", "id", rFrame.ID) - hdlCtx.resFromClient <- rFrame - } else { - logger.Debug("it is request frame", "id", rFrame.ID) - hdlCtx.reqFromClient <- rFrame + logger.Debugf("got message text: %s", strings.Trim(string(messBytes), "\n")) + var rFrame common.RFrame + err = json.Unmarshal(messBytes, &rFrame) + + if err != nil { + return err + } + + logger.Debugf("unmarshalled request frame (ID=%d)", rFrame.ID) + + if rFrame.IsResponse() { + logger.Debug("it is response frame", "id", rFrame.ID) + hdlCtx.resFromClient <- rFrame + } else { + logger.Debug("it is request frame", "id", rFrame.ID) + hdlCtx.reqFromClient <- rFrame + } } } } @@ -148,13 +238,6 @@ func (hdlCtx *HandlerContext) getResponseFrame() common.RFrame { return <-hdlCtx.resFromClient } -type Peer struct { - id int - conn *websocket.Conn - hasAccount bool - account *Account -} - var logger = log.NewWithOptions(os.Stdout, log.Options{ ReportTimestamp: true, TimeFormat: time.TimeOnly, @@ -169,32 +252,36 @@ func init() { } } -func NewPeer(conn *websocket.Conn) *Peer { - return &Peer{-1, conn, false, nil} -} +type Matcher[T any] func(*T) bool -func peerSliceIndexOf(s []*Peer, id int) int { - i := 0 - var p *Peer +func (ctx *Context) removePeer(peer *Peer) { + ctx.handlerContextsLock.Lock() + ctx.peersListLock.Lock() + ctx.initiationsLock.Lock() - for i, p = range s { - if p.id == id { - break - } - } + ctx.handlerContexts = slices.DeleteFunc[[]*HandlerContext, *HandlerContext]( + ctx.handlerContexts, + func(h *HandlerContext) bool { + return h.peer.id == peer.id + }) - return i -} + ctx.peersList = slices.DeleteFunc[[]*Peer, *Peer]( + ctx.peersList, + func(p *Peer) bool { + return p.id == peer.id + }) -func peerSliceRemove(s *[]*Peer, i int) { - (*s)[i] = (*s)[len(*s)-1] - *s = (*s)[:len(*s)-1] -} + ctx.initiations = slices.DeleteFunc[[]*Initiation, *Initiation]( + ctx.initiations, + func(i *Initiation) bool { + return peer.hasAccount && (peer.account.nickname == i.abANick || peer.account.nickname == i.abBNick) + }) -func (srvCtx *Context) removePeer(peer *Peer) { - srvCtx.peersListLock.Lock() - peerSliceRemove(&srvCtx.peersList, peerSliceIndexOf(srvCtx.peersList, peer.id)) - srvCtx.peersListLock.Unlock() + // TODO: Inform the other side about peer leaving + + ctx.handlerContextsLock.Unlock() + ctx.peersListLock.Unlock() + ctx.initiationsLock.Unlock() } func handleDisconnection(handlerCtx *HandlerContext) { @@ -230,13 +317,21 @@ func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res comm listPeersRes := common.ListPeersResponse{PeersInfo: make([]common.PeerInfo, 0)} for _, peer := range peersFreeze { + var nickname string + + if peer.hasAccount { + nickname = peer.account.nickname + } else { + nickname = "" + } + listPeersRes.PeersInfo = append( listPeersRes.PeersInfo, common.PeerInfo{ ID: peer.id, Addr: peer.conn.RemoteAddr().String(), HasNickname: peer.hasAccount, - Nickname: peer.account.nickname, + Nickname: nickname, }, ) } @@ -290,11 +385,86 @@ func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RFrame) (res common.Re return authRes, nil } -func (srvCtx *Context) printConnectedPeers() { - srvCtx.peersListLock.RLock() +func (hdlCtx *HandlerContext) handleChatStartA(reqFrame *common.RFrame) (res common.Response, err error) { + startChatAReq, err := common.RequestFromFrame[common.StartChatARequest](*reqFrame) + + if err != nil { + return nil, err + } + + receiverPeerCtx, err := hdlCtx.getCtxByNick(startChatAReq.Nickname) + + if err != nil { + logger.Debug("receiver peer not found") + return nil, nil + } + + // initation started + hdlCtx.initiationsLock.Lock() + hdlCtx.initiations = append(hdlCtx.initiations, NewInitiation(hdlCtx.peer.account.nickname, startChatAReq.Nickname)) + hdlCtx.initiationsLock.Unlock() + + chatStartB := common.StartChatBRequest{ + Nickname: hdlCtx.peer.account.nickname, + } + + chatStartBReqF, err := common.RequestFrameFrom(chatStartB) + + if err != nil { + logger.Debug("chat start B req frame creation failed") + return nil, err + } + + receiverPeerCtx.rToClient <- chatStartBReqF + + hdlCtx.initiationsLock.Lock() + // + hdlCtx.initiationsLock.Unlock() + + return nil, nil +} + +func (hdlCtx *HandlerContext) handleChatStartC(reqFrame *common.RFrame) (res common.Response, err error) { + startChatAReq, err := common.RequestFromFrame[common.StartChatARequest](*reqFrame) + + if err != nil { + return nil, err + } + + receiverPeerCtx, err := hdlCtx.getCtxByNick(startChatAReq.Nickname) + + if err != nil { + logger.Debug("receiver peer not found") + return nil, nil + } + + // initation started + hdlCtx.initiationsLock.Lock() + hdlCtx.initiations = append(hdlCtx.initiations, NewInitiation(hdlCtx.peer.account.nickname, startChatAReq.Nickname)) + hdlCtx.initiationsLock.Unlock() + + chatStartB := common.StartChatBRequest{ + Nickname: hdlCtx.peer.account.nickname, + } + + chatStartBReqF, err := common.RequestFrameFrom(chatStartB) + + if err != nil { + logger.Debug("chat start B req frame creation failed") + return nil, err + } + + receiverPeerCtx.rToClient <- chatStartBReqF + + return nil, nil +} + +func (ctx *Context) printDebugInfo() { + ctx.peersListLock.RLock() + logger.Debug("================================ server state") logger.Debug("displaying all connections:") - for _, p := range srvCtx.peersList { + for _, p := range ctx.peersList { nick := "-" if p.hasAccount { @@ -304,17 +474,23 @@ func (srvCtx *Context) printConnectedPeers() { logger.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick) } - srvCtx.peersListLock.RUnlock() + logger.Debug("displaying all initiations:") + + for _, i := range ctx.initiations { + logger.Debugf("from %s to %s, stage: %d", i.abBNick, i.abBNick, i.stage) + } + + ctx.peersListLock.RUnlock() } -func (srvCtx *Context) addPeer(peer *Peer) { - srvCtx.idCounterLock.Lock() - srvCtx.idCounter++ - peer.id = srvCtx.idCounter - srvCtx.idCounterLock.Unlock() - srvCtx.peersListLock.Lock() - srvCtx.peersList = append(srvCtx.peersList, peer) - srvCtx.peersListLock.Unlock() +func (ctx *Context) addPeer(peer *Peer) { + ctx.idCounterLock.Lock() + ctx.idCounter++ + peer.id = ctx.idCounter + ctx.idCounterLock.Unlock() + ctx.peersListLock.Lock() + ctx.peersList = append(ctx.peersList, peer) + ctx.peersListLock.Unlock() } func testEcho(hdlCtx *HandlerContext) { @@ -333,7 +509,7 @@ func testEcho(hdlCtx *HandlerContext) { logger.Debug("test echo done", "byteSent", 123, "byteReceived", echoRes.EchoByte) } -func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { +func (ctx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{} conn, err := upgrader.Upgrade(w, r, nil) @@ -343,8 +519,11 @@ func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { } peer := NewPeer(conn) - srvCtx.addPeer(peer) - handlerCtx := NewHandlerContext(peer, srvCtx) + ctx.addPeer(peer) + handlerCtx := NewHandlerContext(peer, ctx) + ctx.handlerContextsLock.Lock() + ctx.handlerContexts = append(ctx.handlerContexts, handlerCtx) + ctx.handlerContextsLock.Unlock() defer handleDisconnection(handlerCtx) defer func(conn *websocket.Conn) { @@ -355,10 +534,30 @@ func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { }(conn) logger.Infof("%s connected", conn.RemoteAddr()) - errGroup := new(errgroup.Group) - errGroup.Go(handlerCtx.clientHandler) - errGroup.Go(handlerCtx.clientWriter) - errGroup.Go(handlerCtx.clientReader) + + errGroup, syncCtx := errgroup.WithContext(context.Background()) + + errGroup.Go(func() error { + return handlerCtx.clientHandler(syncCtx) + }) + + errGroup.Go(func() error { + return handlerCtx.clientWriter(syncCtx) + }) + + errGroup.Go(func() error { + return handlerCtx.clientReader(syncCtx) + }) + + errGroup.Go(func() error { + <-syncCtx.Done() + time.Sleep(time.Second * 3) + close(handlerCtx.rToClient) + close(handlerCtx.resFromClient) + close(handlerCtx.reqFromClient) + return conn.Close() + }) + testEcho(handlerCtx) err = errGroup.Wait() @@ -369,11 +568,11 @@ func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) { } func RunServer() { - srvCtx := &Context{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)} + srvCtx := NewContext() go func() { for { - srvCtx.printConnectedPeers() + srvCtx.printDebugInfo() time.Sleep(time.Second * 5) } }()