Working on initating connections

This commit is contained in:
Maciej Krzyżanowski 2024-04-29 02:02:51 +02:00
parent b9fe4a7094
commit 4a38b6e28a
5 changed files with 480 additions and 185 deletions

View File

@ -1,15 +1,19 @@
package client
import (
"bufio"
"context"
"errors"
"golang.org/x/sync/errgroup"
"math/rand"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/charmbracelet/log"
"github.com/gorilla/websocket"
cm "krzyzanowski.dev/p2pchat/common"
cm "krzyzanowski.dev/archat/common"
)
type Context struct {
@ -30,10 +34,14 @@ func NewClientContext(conn *websocket.Conn) *Context {
}
}
func (cliCtx *Context) serverHandler() error {
for {
reqFrame := <-cliCtx.reqFromServer
func (cliCtx *Context) serverHandler(syncCtx context.Context) error {
defer logger.Debug("server handler last line...")
for {
select {
case <-syncCtx.Done():
return nil
case reqFrame := <-cliCtx.reqFromServer:
logger.Debug("got request from server", "id", reqFrame.ID)
if reqFrame.ID == cm.EchoReqID {
@ -49,24 +57,33 @@ func (cliCtx *Context) serverHandler() error {
cliCtx.rToServer <- resFrame
} else {
logger.Fatal("can't handle it!")
logger.Warn("can't handle it!")
}
}
}
}
func (cliCtx *Context) serverWriter() error {
func (cliCtx *Context) serverWriter(syncCtx context.Context) error {
defer logger.Debug("server writer last line...")
for {
logger.Debug("waiting for a frame to write")
frameToWrite := <-cliCtx.rToServer
select {
case <-syncCtx.Done():
return nil
case frameToWrite := <-cliCtx.rToServer:
err := cliCtx.conn.WriteJSON(frameToWrite)
if err != nil {
return err
}
logger.Debug("frame written", "id", frameToWrite.ID)
}
}
}
func (cliCtx *Context) serverReader() error {
func (cliCtx *Context) serverReader(syncCtx context.Context) error {
defer logger.Debug("server reader last line...")
for {
logger.Debug("waiting for a frame to read")
var rFrame cm.RFrame
@ -115,9 +132,9 @@ func init() {
}
}
func testAuth(ctx *Context) {
func sendAuth(ctx *Context, nick, pass string) {
logger.Info("Trying to authenticate as krzmaciek...")
err := ctx.sendRequest(cm.AuthRequest{Nickname: "krzmaciek", Password: "9maciek1"})
err := ctx.sendRequest(cm.AuthRequest{Nickname: nick, Password: pass})
if err != nil {
logger.Error(err)
@ -136,10 +153,9 @@ func testAuth(ctx *Context) {
logger.Infof("Authenticated?: %t", ar.IsSuccess)
}
func testEcho(ctx *Context) {
echoByte := rand.Intn(32)
func sendEcho(ctx *Context, echoByte byte) {
logger.Info("Testing echo...", "echoByte", echoByte)
err := ctx.sendRequest(cm.EchoRequest{EchoByte: byte(echoByte)})
err := ctx.sendRequest(cm.EchoRequest{EchoByte: echoByte})
if err != nil {
logger.Error(err)
@ -158,7 +174,7 @@ func testEcho(ctx *Context) {
logger.Info("Got response", "echoByte", ereq.EchoByte)
}
func testListPeers(ctx *Context) {
func sendListPeers(ctx *Context) {
logger.Info("Trying to get list of peers...")
err := ctx.sendRequest(cm.ListPeersRequest{})
@ -179,6 +195,18 @@ func testListPeers(ctx *Context) {
logger.Info("Got that list", "peersList", lpreq.PeersInfo)
}
func sendStartChatA(ctx *Context, nick string) {
logger.Info("Doing chat start A...")
err := ctx.sendRequest(cm.StartChatARequest{Nickname: nick})
if err != nil {
logger.Error(err)
return
}
logger.Debug("Request sent, no wait for response")
}
func RunClient() {
u := url.URL{Scheme: "ws", Host: ":8080", Path: "/wsapi"}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
@ -188,32 +216,94 @@ func RunClient() {
return
}
defer func(c *websocket.Conn) {
err := c.Close()
if err != nil {
logger.Error(err)
cliCtx := NewClientContext(c)
errGroup, syncCtx := errgroup.WithContext(context.Background())
errGroup.Go(func() error {
return cliCtx.serverHandler(syncCtx)
})
errGroup.Go(func() error {
return cliCtx.serverReader(syncCtx)
})
errGroup.Go(func() error {
return cliCtx.serverWriter(syncCtx)
})
errGroup.Go(func() error {
<-syncCtx.Done()
logger.Info("closing client...")
time.Sleep(time.Second * 3)
close(cliCtx.rToServer)
close(cliCtx.resFromServer)
close(cliCtx.reqFromServer)
_ = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
return c.Close()
})
closer := make(chan int)
errGroup.Go(func() error {
select {
case <-closer:
return errors.New("close")
case <-syncCtx.Done():
return nil
}
}(c)
})
ctx := NewClientContext(c)
errGroup := new(errgroup.Group)
errGroup.Go(ctx.serverHandler)
errGroup.Go(ctx.serverReader)
errGroup.Go(ctx.serverWriter)
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
cmd := strings.TrimRight(scanner.Text(), " \n\t")
cmdElements := strings.Split(cmd, " ")
cmdName := cmdElements[0]
cmdArgs := cmdElements[1:]
if cmdName == "exit" {
logger.Info("closing...")
closer <- 1
} else if cmdName == "echo" {
if len(cmdArgs) != 1 {
logger.Errorf("echo command requires 1 argument, but %d was provided", len(cmdArgs))
continue
}
num, err := strconv.Atoi(cmdArgs[0])
if err != nil {
logger.Errorf("%s is not a number", cmdArgs[0])
continue
}
sendEcho(cliCtx, byte(num))
} else if cmdName == "list" {
sendListPeers(cliCtx)
} else if cmdName == "auth" {
if len(cmdArgs) != 2 {
logger.Errorf("auth command requires 2 argument, but %d was provided", len(cmdArgs))
continue
}
nick := cmdArgs[0]
pass := cmdArgs[1]
sendAuth(cliCtx, nick, pass)
} else if cmdName == "startchata" {
if len(cmdArgs) != 1 {
logger.Errorf("startchata command requires 1 argument, but %d was provided", len(cmdArgs))
continue
}
sendStartChatA(cliCtx, cmdArgs[0])
}
}
}()
testAuth(ctx)
testEcho(ctx)
testListPeers(ctx)
err = errGroup.Wait()
if err != nil {
logger.Error(err)
}
logger.Info("closing connection...")
err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
logger.Error(err)
}
}

View File

@ -43,6 +43,10 @@ func (rf RFrame) IsResponse() bool {
return rf.ID > 128
}
func (rf RFrame) IsError() bool {
return rf.ID > 256
}
func RequestFrameFrom(req Request) (RFrame, error) {
jsonBytes, err := json.Marshal(req)
@ -139,8 +143,10 @@ func (AuthResponse) ID() int {
return AuthResID
}
// "Stateful" requests like these need to have some information identifying what operation they are linked to
// There may be some errors if two requests are sent by one host to the same other host... may they?
// "Stateful" requests like these need to have some information identifying
// what operation they are linked to
// There may be some errors if two requests are sent by one host to the same
// other host... may they?
type StartChatARequest struct {
Nickname string `json:"nickname"`

2
go.mod
View File

@ -1,4 +1,4 @@
module krzyzanowski.dev/p2pchat
module krzyzanowski.dev/archat
go 1.21.7

View File

@ -4,8 +4,8 @@ import (
"log"
"os"
"krzyzanowski.dev/p2pchat/client"
"krzyzanowski.dev/p2pchat/server"
"krzyzanowski.dev/archat/client"
"krzyzanowski.dev/archat/server"
)
func main() {

View File

@ -1,25 +1,57 @@
package server
import (
"context"
"encoding/json"
"golang.org/x/sync/errgroup"
"errors"
"net/http"
"os"
"slices"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/charmbracelet/log"
"github.com/gorilla/websocket"
"golang.org/x/crypto/bcrypt"
"krzyzanowski.dev/p2pchat/common"
"krzyzanowski.dev/archat/common"
)
type Peer struct {
id int
conn *websocket.Conn
hasAccount bool
account *Account
}
func NewPeer(conn *websocket.Conn) *Peer {
return &Peer{-1, conn, false, nil}
}
type Account struct {
nickname string
passHash []byte
}
const (
InitiationStageA = 1
InitiationStageB = 2
InitiationStageC = 3
InitiationStageD = 4
)
type Initiation struct {
abANick string
abBNick string
stage int
}
func NewInitiation(abA string, abB string) *Initiation {
return &Initiation{abA, abB, InitiationStageA}
}
type Context struct {
idCounter int
idCounterLock sync.RWMutex
@ -27,6 +59,43 @@ type Context struct {
peersListLock sync.RWMutex
accounts map[string]*Account
accountsLock sync.RWMutex
initiations []*Initiation
initiationsLock sync.RWMutex
handlerContexts []*HandlerContext
handlerContextsLock sync.RWMutex
}
func NewContext() *Context {
return &Context{
peersList: make([]*Peer, 0),
accounts: make(map[string]*Account),
initiations: make([]*Initiation, 0),
}
}
// Remember to lock before calling
func (ctx *Context) getPeerByNick(nick string) (*Peer, error) {
for _, peer := range ctx.peersList {
if peer.hasAccount && peer.account.nickname == nick {
return peer, nil
}
}
return nil, errors.New("peer not found")
}
// Remember to lock before calling
func (ctx *Context) getCtxByNick(nick string) (*HandlerContext, error) {
idx := slices.IndexFunc[[]*HandlerContext, *HandlerContext](
ctx.handlerContexts,
func(handlerContext *HandlerContext) bool {
return handlerContext.peer.hasAccount && handlerContext.peer.account.nickname == nick
})
if idx != -1 {
return ctx.handlerContexts[idx], nil
}
return nil, errors.New("not found")
}
type HandlerContext struct {
@ -47,9 +116,13 @@ func NewHandlerContext(peer *Peer, srvCtx *Context) *HandlerContext {
}
}
func (hdlCtx *HandlerContext) clientHandler() error {
func (hdlCtx *HandlerContext) clientHandler(syncCtx context.Context) error {
handleNext:
for {
reqFrame := <-hdlCtx.reqFromClient
select {
case <-syncCtx.Done():
return nil
case reqFrame := <-hdlCtx.reqFromClient:
var res common.Response
var err error
@ -59,6 +132,8 @@ func (hdlCtx *HandlerContext) clientHandler() error {
res, err = hdlCtx.handleListPeers(&reqFrame)
} else if reqFrame.ID == common.EchoReqID {
res, err = hdlCtx.handleEcho(&reqFrame)
} else if reqFrame.ID == common.StartChatAReqID {
res, err = hdlCtx.handleChatStartA(&reqFrame)
}
if err != nil {
@ -66,6 +141,11 @@ func (hdlCtx *HandlerContext) clientHandler() error {
return err
}
if res == nil {
logger.Debugf("request without response ID=%d", reqFrame.ID)
continue handleNext
}
resFrame, err := common.ResponseFrameFrom(res)
if err != nil {
@ -75,11 +155,15 @@ func (hdlCtx *HandlerContext) clientHandler() error {
hdlCtx.rToClient <- resFrame
}
}
}
func (hdlCtx *HandlerContext) clientWriter() error {
func (hdlCtx *HandlerContext) clientWriter(syncCtx context.Context) error {
for {
rFrame := <-hdlCtx.rToClient
select {
case <-syncCtx.Done():
return nil
case rFrame := <-hdlCtx.rToClient:
resJsonBytes, err := json.Marshal(rFrame)
if err != nil {
@ -95,10 +179,15 @@ func (hdlCtx *HandlerContext) clientWriter() error {
return err
}
}
}
}
func (hdlCtx *HandlerContext) clientReader() error {
func (hdlCtx *HandlerContext) clientReader(syncCtx context.Context) error {
for {
select {
case <-syncCtx.Done():
return nil
default:
messType, messBytes, err := hdlCtx.peer.conn.ReadMessage()
if err != nil {
@ -131,6 +220,7 @@ func (hdlCtx *HandlerContext) clientReader() error {
hdlCtx.reqFromClient <- rFrame
}
}
}
}
func (hdlCtx *HandlerContext) sendRequest(req common.Request) error {
@ -148,13 +238,6 @@ func (hdlCtx *HandlerContext) getResponseFrame() common.RFrame {
return <-hdlCtx.resFromClient
}
type Peer struct {
id int
conn *websocket.Conn
hasAccount bool
account *Account
}
var logger = log.NewWithOptions(os.Stdout, log.Options{
ReportTimestamp: true,
TimeFormat: time.TimeOnly,
@ -169,32 +252,36 @@ func init() {
}
}
func NewPeer(conn *websocket.Conn) *Peer {
return &Peer{-1, conn, false, nil}
}
type Matcher[T any] func(*T) bool
func peerSliceIndexOf(s []*Peer, id int) int {
i := 0
var p *Peer
func (ctx *Context) removePeer(peer *Peer) {
ctx.handlerContextsLock.Lock()
ctx.peersListLock.Lock()
ctx.initiationsLock.Lock()
for i, p = range s {
if p.id == id {
break
}
}
ctx.handlerContexts = slices.DeleteFunc[[]*HandlerContext, *HandlerContext](
ctx.handlerContexts,
func(h *HandlerContext) bool {
return h.peer.id == peer.id
})
return i
}
ctx.peersList = slices.DeleteFunc[[]*Peer, *Peer](
ctx.peersList,
func(p *Peer) bool {
return p.id == peer.id
})
func peerSliceRemove(s *[]*Peer, i int) {
(*s)[i] = (*s)[len(*s)-1]
*s = (*s)[:len(*s)-1]
}
ctx.initiations = slices.DeleteFunc[[]*Initiation, *Initiation](
ctx.initiations,
func(i *Initiation) bool {
return peer.hasAccount && (peer.account.nickname == i.abANick || peer.account.nickname == i.abBNick)
})
func (srvCtx *Context) removePeer(peer *Peer) {
srvCtx.peersListLock.Lock()
peerSliceRemove(&srvCtx.peersList, peerSliceIndexOf(srvCtx.peersList, peer.id))
srvCtx.peersListLock.Unlock()
// TODO: Inform the other side about peer leaving
ctx.handlerContextsLock.Unlock()
ctx.peersListLock.Unlock()
ctx.initiationsLock.Unlock()
}
func handleDisconnection(handlerCtx *HandlerContext) {
@ -230,13 +317,21 @@ func (hdlCtx *HandlerContext) handleListPeers(reqFrame *common.RFrame) (res comm
listPeersRes := common.ListPeersResponse{PeersInfo: make([]common.PeerInfo, 0)}
for _, peer := range peersFreeze {
var nickname string
if peer.hasAccount {
nickname = peer.account.nickname
} else {
nickname = ""
}
listPeersRes.PeersInfo = append(
listPeersRes.PeersInfo,
common.PeerInfo{
ID: peer.id,
Addr: peer.conn.RemoteAddr().String(),
HasNickname: peer.hasAccount,
Nickname: peer.account.nickname,
Nickname: nickname,
},
)
}
@ -290,11 +385,86 @@ func (hdlCtx *HandlerContext) handleAuth(reqFrame *common.RFrame) (res common.Re
return authRes, nil
}
func (srvCtx *Context) printConnectedPeers() {
srvCtx.peersListLock.RLock()
func (hdlCtx *HandlerContext) handleChatStartA(reqFrame *common.RFrame) (res common.Response, err error) {
startChatAReq, err := common.RequestFromFrame[common.StartChatARequest](*reqFrame)
if err != nil {
return nil, err
}
receiverPeerCtx, err := hdlCtx.getCtxByNick(startChatAReq.Nickname)
if err != nil {
logger.Debug("receiver peer not found")
return nil, nil
}
// initation started
hdlCtx.initiationsLock.Lock()
hdlCtx.initiations = append(hdlCtx.initiations, NewInitiation(hdlCtx.peer.account.nickname, startChatAReq.Nickname))
hdlCtx.initiationsLock.Unlock()
chatStartB := common.StartChatBRequest{
Nickname: hdlCtx.peer.account.nickname,
}
chatStartBReqF, err := common.RequestFrameFrom(chatStartB)
if err != nil {
logger.Debug("chat start B req frame creation failed")
return nil, err
}
receiverPeerCtx.rToClient <- chatStartBReqF
hdlCtx.initiationsLock.Lock()
//
hdlCtx.initiationsLock.Unlock()
return nil, nil
}
func (hdlCtx *HandlerContext) handleChatStartC(reqFrame *common.RFrame) (res common.Response, err error) {
startChatAReq, err := common.RequestFromFrame[common.StartChatARequest](*reqFrame)
if err != nil {
return nil, err
}
receiverPeerCtx, err := hdlCtx.getCtxByNick(startChatAReq.Nickname)
if err != nil {
logger.Debug("receiver peer not found")
return nil, nil
}
// initation started
hdlCtx.initiationsLock.Lock()
hdlCtx.initiations = append(hdlCtx.initiations, NewInitiation(hdlCtx.peer.account.nickname, startChatAReq.Nickname))
hdlCtx.initiationsLock.Unlock()
chatStartB := common.StartChatBRequest{
Nickname: hdlCtx.peer.account.nickname,
}
chatStartBReqF, err := common.RequestFrameFrom(chatStartB)
if err != nil {
logger.Debug("chat start B req frame creation failed")
return nil, err
}
receiverPeerCtx.rToClient <- chatStartBReqF
return nil, nil
}
func (ctx *Context) printDebugInfo() {
ctx.peersListLock.RLock()
logger.Debug("================================ server state")
logger.Debug("displaying all connections:")
for _, p := range srvCtx.peersList {
for _, p := range ctx.peersList {
nick := "-"
if p.hasAccount {
@ -304,17 +474,23 @@ func (srvCtx *Context) printConnectedPeers() {
logger.Debugf("ID#%d, Addr:%s, Auth:%t, Nick:%s", p.id, p.conn.RemoteAddr(), p.hasAccount, nick)
}
srvCtx.peersListLock.RUnlock()
logger.Debug("displaying all initiations:")
for _, i := range ctx.initiations {
logger.Debugf("from %s to %s, stage: %d", i.abBNick, i.abBNick, i.stage)
}
ctx.peersListLock.RUnlock()
}
func (srvCtx *Context) addPeer(peer *Peer) {
srvCtx.idCounterLock.Lock()
srvCtx.idCounter++
peer.id = srvCtx.idCounter
srvCtx.idCounterLock.Unlock()
srvCtx.peersListLock.Lock()
srvCtx.peersList = append(srvCtx.peersList, peer)
srvCtx.peersListLock.Unlock()
func (ctx *Context) addPeer(peer *Peer) {
ctx.idCounterLock.Lock()
ctx.idCounter++
peer.id = ctx.idCounter
ctx.idCounterLock.Unlock()
ctx.peersListLock.Lock()
ctx.peersList = append(ctx.peersList, peer)
ctx.peersListLock.Unlock()
}
func testEcho(hdlCtx *HandlerContext) {
@ -333,7 +509,7 @@ func testEcho(hdlCtx *HandlerContext) {
logger.Debug("test echo done", "byteSent", 123, "byteReceived", echoRes.EchoByte)
}
func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) {
func (ctx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
@ -343,8 +519,11 @@ func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) {
}
peer := NewPeer(conn)
srvCtx.addPeer(peer)
handlerCtx := NewHandlerContext(peer, srvCtx)
ctx.addPeer(peer)
handlerCtx := NewHandlerContext(peer, ctx)
ctx.handlerContextsLock.Lock()
ctx.handlerContexts = append(ctx.handlerContexts, handlerCtx)
ctx.handlerContextsLock.Unlock()
defer handleDisconnection(handlerCtx)
defer func(conn *websocket.Conn) {
@ -355,10 +534,30 @@ func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) {
}(conn)
logger.Infof("%s connected", conn.RemoteAddr())
errGroup := new(errgroup.Group)
errGroup.Go(handlerCtx.clientHandler)
errGroup.Go(handlerCtx.clientWriter)
errGroup.Go(handlerCtx.clientReader)
errGroup, syncCtx := errgroup.WithContext(context.Background())
errGroup.Go(func() error {
return handlerCtx.clientHandler(syncCtx)
})
errGroup.Go(func() error {
return handlerCtx.clientWriter(syncCtx)
})
errGroup.Go(func() error {
return handlerCtx.clientReader(syncCtx)
})
errGroup.Go(func() error {
<-syncCtx.Done()
time.Sleep(time.Second * 3)
close(handlerCtx.rToClient)
close(handlerCtx.resFromClient)
close(handlerCtx.reqFromClient)
return conn.Close()
})
testEcho(handlerCtx)
err = errGroup.Wait()
@ -369,11 +568,11 @@ func (srvCtx *Context) wsapiHandler(w http.ResponseWriter, r *http.Request) {
}
func RunServer() {
srvCtx := &Context{peersList: make([]*Peer, 0), accounts: make(map[string]*Account)}
srvCtx := NewContext()
go func() {
for {
srvCtx.printConnectedPeers()
srvCtx.printDebugInfo()
time.Sleep(time.Second * 5)
}
}()