Enabled two-way communication between client and server

Server may now send requests to client and client will respond to them.
Mechanics similar to how server handled requests was added to client.
Also server can now receive responses from client :)
This commit is contained in:
Maciej Krzyżanowski 2024-03-29 15:05:42 +01:00
parent b02b35e7e0
commit f16b53278b
4 changed files with 303 additions and 178 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.idea/

View File

@ -1,6 +1,7 @@
package client package client
import ( import (
"math/rand"
"net/url" "net/url"
"os" "os"
"time" "time"
@ -10,6 +11,95 @@ import (
cm "krzyzanowski.dev/p2pchat/common" cm "krzyzanowski.dev/p2pchat/common"
) )
type Context struct {
conn *websocket.Conn
// Assumption: size of 1 is enough, because first response read will be response for the last request
// no need to buffer
resFromServer chan cm.RFrame
reqFromServer chan cm.RFrame
rToServer chan cm.RFrame
}
func NewClientContext(conn *websocket.Conn) *Context {
return &Context{
conn: conn,
resFromServer: make(chan cm.RFrame),
reqFromServer: make(chan cm.RFrame),
rToServer: make(chan cm.RFrame),
}
}
func (cliCtx *Context) serverHandler() error {
for {
reqFrame := <-cliCtx.reqFromServer
logger.Debug("got request from server", "id", reqFrame.ID)
if reqFrame.ID == cm.EchoReqID {
echoReq, err := cm.RequestFromFrame[cm.EchoRequest](reqFrame)
if err != nil {
return err
}
resFrame, err := cm.ResponseFrameFrom(cm.EchoResponse(echoReq))
if err != nil {
return err
}
cliCtx.rToServer <- resFrame
} else {
logger.Fatal("can't handle it!")
}
}
}
func (cliCtx *Context) serverWriter() error {
for {
logger.Debug("waiting for a frame to write")
frameToWrite := <-cliCtx.rToServer
err := cliCtx.conn.WriteJSON(frameToWrite)
if err != nil {
return err
}
logger.Debug("frame written", "id", frameToWrite.ID)
}
}
func (cliCtx *Context) serverReader() error {
for {
logger.Debug("waiting for a frame to read")
var rFrame cm.RFrame
err := cliCtx.conn.ReadJSON(&rFrame)
if err != nil {
return err
}
logger.Debug("frame read", "id", rFrame.ID)
if rFrame.ID > 128 {
cliCtx.resFromServer <- rFrame
} else {
cliCtx.reqFromServer <- rFrame
}
logger.Debug("frame pushed", "id", rFrame.ID)
}
}
func (cliCtx *Context) sendRequest(req cm.Request) error {
rf, err := cm.RequestFrameFrom(req)
if err != nil {
return err
}
cliCtx.rToServer <- rf
return nil
}
func (cliCtx *Context) getResponseFrame() cm.RFrame {
return <-cliCtx.resFromServer
}
var logger = log.NewWithOptions(os.Stdout, log.Options{ var logger = log.NewWithOptions(os.Stdout, log.Options{
ReportTimestamp: true, ReportTimestamp: true,
TimeFormat: time.TimeOnly, TimeFormat: time.TimeOnly,
@ -24,6 +114,43 @@ func init() {
} }
} }
func testAuth(ctx *Context) {
logger.Info("Trying to authenticate as krzmaciek...")
ctx.sendRequest(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"})
logger.Debug("Request sent, waiting for response...")
arf := ctx.getResponseFrame()
ar, err := cm.ResponseFromFrame[cm.AuthResponse](arf)
if err != nil {
logger.Error(err)
}
logger.Infof("Authenticated?: %t", ar.IsSuccess)
}
func testEcho(ctx *Context) {
echoByte := rand.Intn(32)
logger.Info("Testing echo...", "echoByte", echoByte)
ctx.sendRequest(cm.EchoRequest{EchoByte: byte(echoByte)})
logger.Debug("Request sent, waiting for response...")
ereqf := ctx.getResponseFrame()
ereq, err := cm.ResponseFromFrame[cm.EchoResponse](ereqf)
if err != nil {
logger.Error(err)
}
logger.Info("Got response", "echoByte", ereq.EchoByte)
}
func testListPeers(ctx *Context) {
logger.Info("Trying to get list of peers...")
ctx.sendRequest(cm.ListPeersRequest{})
logger.Debug("Request sent, waiting for response...")
lpreqf := ctx.getResponseFrame()
lpreq, err := cm.ResponseFromFrame[cm.ListPeersResponse](lpreqf)
if err != nil {
logger.Error(err)
}
logger.Info("Got that list", "peersList", lpreq.PeersInfo)
}
func RunClient() { func RunClient() {
u := url.URL{Scheme: "ws", Host: ":8080", Path: "/wsapi"} u := url.URL{Scheme: "ws", Host: ":8080", Path: "/wsapi"}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
@ -35,80 +162,14 @@ func RunClient() {
defer c.Close() defer c.Close()
logger.Info("authenticating...") ctx := NewClientContext(c)
rf, _ := cm.RequestFrameFrom(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"}) go ctx.serverHandler()
err = c.WriteJSON(rf) go ctx.serverReader()
if err != nil { go ctx.serverWriter()
logger.Fatal(err)
}
var authResFrame cm.ResponseFrame testAuth(ctx)
err = c.ReadJSON(&authResFrame) testEcho(ctx)
if err != nil { testListPeers(ctx)
logger.Fatal(err)
}
authRes, err := cm.ResponseFromFrame[cm.AuthResponse](authResFrame)
if err != nil {
logger.Fatal(err)
}
logger.Infof("authentication result: %t", authRes.IsSuccess)
time.Sleep(time.Second * 1)
logger.Info("sending echo...")
echoByte := 123
rf, err = cm.RequestFrameFrom(cm.EchoRequest{EchoByte: byte(echoByte)})
if err != nil {
logger.Fatal(err)
}
err = c.WriteJSON(rf)
if err != nil {
logger.Fatal(err)
}
var echoResFrame cm.ResponseFrame
err = c.ReadJSON(&echoResFrame)
if err != nil {
logger.Fatal(err)
}
echoRes, err := cm.ResponseFromFrame[cm.EchoResponse](echoResFrame)
if err != nil {
logger.Fatal(err)
}
logger.Infof("sent echo of %d, got %d in return", echoByte, echoRes.EchoByte)
time.Sleep(time.Second)
logger.Infof("i want list of peers...")
rf, err = cm.RequestFrameFrom(cm.ListPeersRequest{})
if err != nil {
logger.Fatal(err)
}
err = c.WriteJSON(rf)
if err != nil {
logger.Fatal(err)
}
var listPeersResFrame cm.ResponseFrame
err = c.ReadJSON(&listPeersResFrame)
if err != nil {
logger.Fatal(err)
}
listPeersRes, err := cm.ResponseFromFrame[cm.ListPeersResponse](listPeersResFrame)
if err != nil {
logger.Fatal(err)
}
logger.Info("printing list of peers:")
for _, p := range listPeersRes.PeersInfo {
logger.Infof("%+v", p)
}
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
logger.Info("closing connection...") logger.Info("closing connection...")

View File

@ -7,9 +7,12 @@ import (
// Constants // Constants
const ( const (
EchoRID = 1 EchoReqID = 1
ListPeersRID = 2 EchoResID = 128 + EchoReqID
AuthRID = 3 ListPeersReqID = 2
ListPeersResID = 128 + ListPeersReqID
AuthReqID = 3
AuthResID = 128 + AuthReqID
) )
// Requests & responses subtypes // Requests & responses subtypes
@ -23,22 +26,22 @@ type PeerInfo struct {
// Requests & responses: // Requests & responses:
type RequestFrame struct { type RFrame struct {
ID int `json:"id"` ID int `json:"id"`
Rest json.RawMessage `json:"request"` Rest json.RawMessage `json:"r"`
} }
func RequestFrameFrom(req Request) (RequestFrame, error) { func RequestFrameFrom(req Request) (RFrame, error) {
jsonBytes, err := json.Marshal(req) jsonBytes, err := json.Marshal(req)
if err != nil { if err != nil {
return *new(RequestFrame), err return *new(RFrame), err
} }
return RequestFrame{req.RID(), jsonBytes}, nil return RFrame{req.ID(), jsonBytes}, nil
} }
func RequestFromFrame[T Request](reqFrame RequestFrame) (T, error) { func RequestFromFrame[T Request](reqFrame RFrame) (T, error) {
var req T var req T
err := json.Unmarshal(reqFrame.Rest, &req) err := json.Unmarshal(reqFrame.Rest, &req)
@ -49,22 +52,17 @@ func RequestFromFrame[T Request](reqFrame RequestFrame) (T, error) {
return req, nil return req, nil
} }
type ResponseFrame struct { func ResponseFrameFrom(res Response) (RFrame, error) {
ID int `json:"id"`
Rest json.RawMessage `json:"response"`
}
func ResponseFrameFrom(res Response) (ResponseFrame, error) {
jsonBytes, err := json.Marshal(res) jsonBytes, err := json.Marshal(res)
if err != nil { if err != nil {
return *new(ResponseFrame), err return *new(RFrame), err
} }
return ResponseFrame{res.RID(), jsonBytes}, nil return RFrame{res.ID(), jsonBytes}, nil
} }
func ResponseFromFrame[T Response](resFrame ResponseFrame) (T, error) { func ResponseFromFrame[T Response](resFrame RFrame) (T, error) {
var res T var res T
err := json.Unmarshal(resFrame.Rest, &res) err := json.Unmarshal(resFrame.Rest, &res)
@ -76,7 +74,7 @@ func ResponseFromFrame[T Response](resFrame ResponseFrame) (T, error) {
} }
type Request interface { type Request interface {
RID() int ID() int
} }
type Response Request type Response Request
@ -85,31 +83,31 @@ type EchoRequest struct {
EchoByte byte `json:"echoByte"` EchoByte byte `json:"echoByte"`
} }
func (EchoRequest) RID() int { func (EchoRequest) ID() int {
return EchoRID return EchoReqID
} }
type EchoResponse struct { type EchoResponse struct {
EchoByte byte `json:"echoByte"` EchoByte byte `json:"echoByte"`
} }
func (EchoResponse) RID() int { func (EchoResponse) ID() int {
return EchoRID return EchoResID
} }
type ListPeersRequest struct { type ListPeersRequest struct {
} }
func (ListPeersRequest) RID() int { func (ListPeersRequest) ID() int {
return ListPeersRID return ListPeersReqID
} }
type ListPeersResponse struct { type ListPeersResponse struct {
PeersInfo []PeerInfo `json:"peers"` PeersInfo []PeerInfo `json:"peers"`
} }
func (ListPeersResponse) RID() int { func (ListPeersResponse) ID() int {
return ListPeersRID return ListPeersResID
} }
type AuthRequest struct { type AuthRequest struct {
@ -117,14 +115,14 @@ type AuthRequest struct {
Password string `json:"password"` Password string `json:"password"`
} }
func (AuthRequest) RID() int { func (AuthRequest) ID() int {
return AuthRID return AuthReqID
} }
type AuthResponse struct { type AuthResponse struct {
IsSuccess bool IsSuccess bool
} }
func (AuthResponse) RID() int { func (AuthResponse) ID() int {
return AuthRID return AuthResID
} }

View File

@ -19,7 +19,7 @@ type Account struct {
passHash []byte passHash []byte
} }
type ServerContext struct { type Context struct {
idCounter int idCounter int
idCounterLock sync.RWMutex idCounterLock sync.RWMutex
peersList []*Peer peersList []*Peer
@ -30,7 +30,117 @@ type ServerContext struct {
type HandlerContext struct { type HandlerContext struct {
peer *Peer peer *Peer
*ServerContext *Context
resFromClient chan common.RFrame
reqFromClient chan common.RFrame
rToClient chan common.RFrame
}
func NewHandlerContext(peer *Peer, srvCtx *Context) *HandlerContext {
return &HandlerContext{
peer,
srvCtx,
make(chan common.RFrame),
make(chan common.RFrame),
make(chan common.RFrame),
}
}
func (hdlCtx *HandlerContext) clientHandler(hdlWg *sync.WaitGroup) error {
defer hdlWg.Done()
for {
reqFrame := <-hdlCtx.reqFromClient
var res common.Response
var err error
if reqFrame.ID == common.AuthReqID {
res, err = hdlCtx.handleAuth(&reqFrame)
} else if reqFrame.ID == common.ListPeersReqID {
res, err = hdlCtx.handleListPeers(&reqFrame)
} else if reqFrame.ID == common.EchoReqID {
res, err = hdlCtx.handleEcho(&reqFrame)
}
if err != nil {
logger.Errorf("could not handle request ID=%d", reqFrame.ID)
return err
}
resFrame, err := common.ResponseFrameFrom(res)
if err != nil {
logger.Errorf("could not create frame from response")
return err
}
hdlCtx.rToClient <- resFrame
}
}
func (hdlCtx *HandlerContext) clientWriter(hdlWg *sync.WaitGroup) error {
defer hdlWg.Done()
for {
rFrame := <-hdlCtx.rToClient
resJsonBytes, err := json.Marshal(rFrame)
if err != nil {
logger.Errorf("error marshalling frame to json")
return err
}
logger.Debugf("sending %s", string(resJsonBytes))
err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes)
if err != nil {
logger.Errorf("error writing rframe")
return err
}
}
}
func (hdlCtx *HandlerContext) clientReader(hdlWg *sync.WaitGroup) error {
defer hdlWg.Done()
for {
messType, messBytes, err := hdlCtx.peer.conn.ReadMessage()
if err != nil {
return err
}
if messType != 1 {
err := hdlCtx.peer.conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported"))
if err != nil {
logger.Debugf("[Server] error sending unsupported data close message")
return err
}
}
logger.Debugf("got message text: %s", strings.Trim(string(messBytes), "\n"))
var rFrame common.RFrame
json.Unmarshal(messBytes, &rFrame)
logger.Debugf("unmarshalled request frame (ID=%d)", rFrame.ID)
if rFrame.ID > 128 {
logger.Debug("it is response frame", "id", rFrame.ID)
hdlCtx.resFromClient <- rFrame
} else {
logger.Debug("it is request frame", "id", rFrame.ID)
hdlCtx.reqFromClient <- rFrame
}
}
}
func (hdlCtx *HandlerContext) sendRequest(req common.Request) error {
rf, err := common.RequestFrameFrom(req)
if err != nil {
return err
}
hdlCtx.rToClient <- rf
return nil
}
func (hdlCtx *HandlerContext) getResponseFrame() common.RFrame {
return <-hdlCtx.resFromClient
} }
type Peer struct { type Peer struct {
@ -74,7 +184,7 @@ func peerSliceRemove(s *[]*Peer, i int) {
*s = (*s)[:len(*s)-1] *s = (*s)[:len(*s)-1]
} }
func (srvCtx *ServerContext) removePeer(peer *Peer) { func (srvCtx *Context) removePeer(peer *Peer) {
srvCtx.peersListLock.Lock() srvCtx.peersListLock.Lock()
peerSliceRemove(&srvCtx.peersList, peerSliceIndexOf(srvCtx.peersList, peer.id)) peerSliceRemove(&srvCtx.peersList, peerSliceIndexOf(srvCtx.peersList, peer.id))
srvCtx.peersListLock.Unlock() srvCtx.peersListLock.Unlock()
@ -85,7 +195,7 @@ func handleDisconnection(handlerCtx *HandlerContext) {
logger.Infof("%s disconnected", handlerCtx.peer.conn.RemoteAddr()) logger.Infof("%s disconnected", handlerCtx.peer.conn.RemoteAddr())
} }
func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RequestFrame) (res common.Response, err error) { func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RFrame) (res common.Response, err error) {
echoReq, err := common.RequestFromFrame[common.EchoRequest](*reqFrame) echoReq, err := common.RequestFromFrame[common.EchoRequest](*reqFrame)
if err != nil { if err != nil {
logger.Error("could not read request from frame") logger.Error("could not read request from frame")
@ -96,7 +206,7 @@ func (hdlCtx *HandlerContext) handleEcho(reqFrame *common.RequestFrame) (res com
return echoRes, nil return echoRes, nil
} }
func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RequestFrame) (res common.Response, err error) { func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res common.Response, err error) {
// Currently list peers request is empty, so we can ignore it - we won't use it // Currently list peers request is empty, so we can ignore it - we won't use it
_, err = common.RequestFromFrame[common.ListPeersRequest](*reqFrame) _, err = common.RequestFromFrame[common.ListPeersRequest](*reqFrame)
if err != nil { if err != nil {
@ -125,7 +235,7 @@ func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RequestFrame) (re
return listPeersRes, nil return listPeersRes, nil
} }
func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RequestFrame) (res common.Response, err error) { func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RFrame) (res common.Response, err error) {
authReq, err := common.RequestFromFrame[common.AuthRequest](*reqFrame) authReq, err := common.RequestFromFrame[common.AuthRequest](*reqFrame)
if err != nil { if err != nil {
logger.Error("could not read request from frame") logger.Error("could not read request from frame")
@ -170,7 +280,7 @@ func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RequestFrame) (res com
return authRes, nil return authRes, nil
} }
func (srvCtx *ServerContext) printConnectedPeers() { func (srvCtx *Context) printConnectedPeers() {
srvCtx.peersListLock.RLock() srvCtx.peersListLock.RLock()
logger.Debug("displaying all connections:") logger.Debug("displaying all connections:")
@ -181,56 +291,13 @@ func (srvCtx *ServerContext) printConnectedPeers() {
nick = p.account.nickname nick = p.account.nickname
} }
log.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick) logger.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick)
} }
srvCtx.peersListLock.RUnlock() srvCtx.peersListLock.RUnlock()
} }
func (hdlCtx *HandlerContext) handleRequest(reqJsonBytes []byte) error { func (srvCtx *Context) addPeer(peer *Peer) {
logger.Debugf("got message text: %s", strings.Trim(string(reqJsonBytes), "\n"))
var reqFrame common.RequestFrame
json.Unmarshal(reqJsonBytes, &reqFrame)
log.Debugf("unmarshalled request frame (ID=%d)", reqFrame.ID)
var res common.Response
var err error
if reqFrame.ID == common.AuthRID {
res, err = hdlCtx.handleAuth(&reqFrame)
} else if reqFrame.ID == common.ListPeersRID {
res, err = hdlCtx.handleListPeers(&reqFrame)
} else if reqFrame.ID == common.EchoRID {
res, err = hdlCtx.handleEcho(&reqFrame)
}
if err != nil {
logger.Errorf("could not handle request ID=%d", reqFrame.ID)
return err
}
resFrame, err := common.ResponseFrameFrom(res)
if err != nil {
logger.Errorf("could not create frame from response")
return err
}
resJsonBytes, err := json.Marshal(resFrame)
if err != nil {
logger.Errorf("error marshalling frame to json")
return err
}
logger.Debugf("sending %s", string(resJsonBytes))
err = hdlCtx.peer.conn.WriteMessage(websocket.TextMessage, resJsonBytes)
if err != nil {
logger.Errorf("error writing response frame")
return err
}
return nil
}
func (srvCtx *ServerContext) addPeer(peer *Peer) {
srvCtx.idCounterLock.Lock() srvCtx.idCounterLock.Lock()
srvCtx.idCounter++ srvCtx.idCounter++
peer.id = srvCtx.idCounter peer.id = srvCtx.idCounter
@ -240,7 +307,7 @@ func (srvCtx *ServerContext) addPeer(peer *Peer) {
srvCtx.peersListLock.Unlock() srvCtx.peersListLock.Unlock()
} }
func (srvCtx *ServerContext) wsapiHandler(w http.ResponseWriter, r *http.Request) { func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{} upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
@ -250,36 +317,34 @@ func (srvCtx *ServerContext) wsapiHandler(w http.ResponseWriter, r *http.Request
peer := NewPeer(conn) peer := NewPeer(conn)
srvCtx.addPeer(peer) srvCtx.addPeer(peer)
handlerCtx := &HandlerContext{peer, srvCtx} handlerCtx := NewHandlerContext(peer, srvCtx)
defer handleDisconnection(handlerCtx) defer handleDisconnection(handlerCtx)
defer conn.Close() defer conn.Close()
logger.Infof("%s connected", conn.RemoteAddr()) logger.Infof("%s connected", conn.RemoteAddr())
for { var handlerWg sync.WaitGroup
messType, messBytes, err := conn.ReadMessage() handlerWg.Add(3)
if err != nil { go handlerCtx.clientWriter(&handlerWg)
break go handlerCtx.clientHandler(&handlerWg)
} go handlerCtx.clientReader(&handlerWg)
if messType != 1 { logger.Debug("sending echo request...")
err := conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Only JSON text is supported")) handlerCtx.sendRequest(common.EchoRequest{EchoByte: 123})
logger.Debug("sent")
echoResF := handlerCtx.getResponseFrame()
logger.Debug("got response")
echoRes, err := common.ResponseFromFrame[common.EchoResponse](echoResF)
if err != nil { if err != nil {
logger.Debugf("[Server] error sending close message due to unsupported data") logger.Error(err)
}
return return
} }
logger.Debug("test echo done", "byteSent", 123, "byteReceived", echoRes.EchoByte)
err = handlerCtx.handleRequest(messBytes) handlerWg.Wait()
if err != nil {
logger.Debug(err)
break
}
}
} }
func RunServer() { func RunServer() {
srvCtx := &ServerContext{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)} srvCtx := &Context{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)}
go func() { go func() {
for { for {