mirror of
https://github.com/zhaojh329/rttys.git
synced 2026-02-27 09:53:21 +08:00
```
2025-04-24T06:59:56Z |INFO| New session: 81ee1ed4d12c3048620fbb2168c08257
2025-04-24T07:00:10Z |INFO| Delete session: 81ee1ed4d12c3048620fbb2168c08257
2025-04-24T07:00:17Z |INFO| New session: 427ce01b7f47a7ee8192cefdfa291c8f
2025-04-24T07:00:57Z |INFO| Delete session: 427ce01b7f47a7ee8192cefdfa291c8f
2025-04-24T07:01:55Z |INFO| New session: ffbfd5c596195c2bd43b8c0b6dc3d135
2025-04-24T07:01:57Z |ERRO| websocket: close sent
panic: send on closed channel
goroutine 33 [running]:
main.(*user).WriteMsg(0xc00037e680, 0x2, {0xc000188d10, 0xc, 0xc})
/home/runner/work/rttys/rttys/user.go:52 +0x8b
main.(*broker).run(0xc00035a000)
/home/runner/work/rttys/rttys/broker.go:227 +0xf34
created by main.runRttys in goroutine 1
/home/runner/work/rttys/rttys/main.go:86 +0x595
```
Signed-off-by: Jianhui Zhao <zhaojh329@gmail.com>
154 lines
2.4 KiB
Go
154 lines
2.4 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"rttys/client"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
const (
|
|
loginErrorNone = 0x00
|
|
loginErrorOffline = 0x01
|
|
loginErrorBusy = 0x02
|
|
)
|
|
|
|
type user struct {
|
|
br *broker
|
|
sid string
|
|
devid string
|
|
conn *websocket.Conn
|
|
closed uint32
|
|
send chan *usrMessage // Buffered channel of outbound messages.
|
|
}
|
|
|
|
type usrMessage struct {
|
|
sid string
|
|
typ int
|
|
data []byte
|
|
}
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
}
|
|
|
|
func (u *user) IsDevice() bool {
|
|
return false
|
|
}
|
|
|
|
func (u *user) DeviceID() string {
|
|
return u.devid
|
|
}
|
|
|
|
func (u *user) WriteMsg(typ int, data []byte) {
|
|
u.send <- &usrMessage{
|
|
typ: typ,
|
|
data: data,
|
|
}
|
|
}
|
|
|
|
func (u *user) Closed() bool {
|
|
return atomic.LoadUint32(&u.closed) == 1
|
|
}
|
|
|
|
func (u *user) CloseConn() {
|
|
u.conn.Close()
|
|
}
|
|
|
|
func (u *user) Close() {
|
|
if u.Closed() {
|
|
return
|
|
}
|
|
|
|
atomic.StoreUint32(&u.closed, 1)
|
|
|
|
u.CloseConn()
|
|
|
|
close(u.send)
|
|
}
|
|
|
|
func userLoginAck(code int, c client.Client) {
|
|
msg := fmt.Sprintf(`{"type":"login","sid":"%s","err":%d}`, c.(*user).sid, code)
|
|
c.WriteMsg(websocket.TextMessage, []byte(msg))
|
|
}
|
|
|
|
func (u *user) readLoop() {
|
|
defer func() {
|
|
u.br.unregister <- u
|
|
}()
|
|
|
|
for {
|
|
typ, data, err := u.conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
log.Error().Msg(err.Error())
|
|
}
|
|
break
|
|
}
|
|
|
|
u.br.userMessage <- &usrMessage{u.sid, typ, data}
|
|
}
|
|
}
|
|
|
|
func (u *user) writeLoop() {
|
|
ticker := time.NewTicker(time.Second * 5)
|
|
|
|
defer func() {
|
|
ticker.Stop()
|
|
u.br.unregister <- u
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
u.WriteMsg(websocket.PingMessage, []byte{})
|
|
|
|
case msg, ok := <-u.send:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
err := u.conn.WriteMessage(msg.typ, msg.data)
|
|
if err != nil {
|
|
log.Error().Msg(err.Error())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func serveUser(br *broker, c *gin.Context) {
|
|
devid := c.Param("devid")
|
|
if devid == "" {
|
|
c.Status(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
c.Status(http.StatusBadRequest)
|
|
log.Error().Msg(err.Error())
|
|
return
|
|
}
|
|
|
|
u := &user{
|
|
br: br,
|
|
conn: conn,
|
|
devid: devid,
|
|
send: make(chan *usrMessage, 256),
|
|
}
|
|
|
|
go u.readLoop()
|
|
go u.writeLoop()
|
|
|
|
br.register <- u
|
|
}
|