北山输绿涨横陂,直堑回塘滟滟时。——王安石《北山》
本次开发主要使用了Gorilla Websocket软件包
客户端结构体
维护socket连接,保存客户端信息
代码
package contorller
import (
"encoding/json"
"fmt"
"git.myscrm.cn/vr/server-yk-app-builder/model"
"git.myscrm.cn/vr/server-yk-app-builder/pkg/enum"
"git.myscrm.cn/vr/server-yk-app-builder/pkg/logger"
"git.myscrm.cn/vr/server-yk-app-builder/pkg/response"
"github.com/gorilla/websocket"
"strconv"
"time"
)
const (
// pingWait is the maximum time in seconds to wait for a ping from
pingWait = 20 * time.Second
// Maximum message size allowed from peer.
maxMessageSize = 512
)
type wsClient struct {
manager *wsManager
id string
desc string
conn *websocket.Conn
status enum.ClientStatus
send chan []byte
isClosed chan bool
lastPing time.Time
}
func newWsClient(manager *wsManager, conn *websocket.Conn, id, name, ip string) *wsClient {
return &wsClient{
manager: manager,
id: id,
desc: fmt.Sprintf("客户端(%s, %s)", name, ip),
conn: conn,
status: enum.ClientStatusOffline,
send: make(chan []byte),
isClosed: make(chan bool),
lastPing: time.Now(),
}
}
// Read 读取客户端发送过来的消息
func (c *wsClient) Read() {
defer func() {
c.unRegister()
logger.Info(c.desc, "read协程退出")
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetPingHandler(func(text string) error {
// 只需要知道客户端还活着就行,不需要回复
c.lastPing = time.Now()
// 更新客户端状态
clientStatus, _ := strconv.ParseInt(text, 10, 32)
c.status = enum.ClientStatus(int32(clientStatus))
return nil
})
for {
msgType, data, err := c.conn.ReadMessage()
if err != nil {
logger.Error(c.desc, "c.conn.ReadMessage", err.Error())
break
}
switch msgType {
case websocket.TextMessage:
var msg *model.WsMessage
err = json.Unmarshal(data, &msg)
if err != nil {
logger.Error(c.desc, "json.Unmarshal", err.Error())
break
}
switch msg.Type {
default:
logger.Info(c.desc, "未知消息类型", string(data))
c.send <- data
}
}
}
}
// Write 把对应消息写回客户端
func (c *wsClient) Write() {
defer func() {
logger.Info(c.desc, "write协程退出")
c.unRegister()
}()
for {
select {
case <-c.isClosed:
return
case msg := <-c.send:
err := c.conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Error(c.desc, "c.conn.WriteMessage", err.Error())
return
}
}
}
}
// Check 检测客户端是否超时
func (c *wsClient) Check() {
defer func() {
logger.Info(c.desc, "check协程退出")
}()
ticker := time.NewTicker(pingWait / 6)
for {
select {
case <-c.isClosed:
return
case <-ticker.C:
// 主动关闭连接
if time.Now().Sub(c.lastPing) > pingWait {
response.WsReturnErr(c.conn, enum.WsDataErr, "客户端超时,主动关闭连接")
logger.Info(c.desc, "客户端超时,主动关闭连接")
return
}
}
}
}
func (c *wsClient) unRegister() {
if c.manager.clients[c.id] != nil {
c.manager.unRegister <- c
}
}
使用
应当在控制器/入口处
func (c *cSocket) Ws(ctx *gin.Context) {
ws, err := c.upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
return nil, "", err
}
ip := ctx.ClientIP()
name := ctx.GetString("device_name")
client := newWsClient(WsManager, ws, clientId, name, ip)
client.manager.register <- client
go client.Read()
go client.Write()
go client.Check()
}
客户端管理器
管理客户端,支持登录、注销、广播
代码
package contorller
import (
"encoding/json"
"git.myscrm.cn/vr/server-yk-app-builder/model"
"git.myscrm.cn/vr/server-yk-app-builder/pkg/enum"
"git.myscrm.cn/vr/server-yk-app-builder/pkg/logger"
)
var WsManager = NewWsManager()
type wsManager struct {
clients map[string]*wsClient // 记录在线用户
broadcast chan []byte // 触发消息广播
register chan *wsClient // 触发设备或用户登陆
unRegister chan *wsClient // 触发设备或用户退出
}
func NewWsManager() *wsManager {
return &wsManager{
clients: make(map[string]*wsClient),
broadcast: make(chan []byte),
register: make(chan *wsClient),
unRegister: make(chan *wsClient),
}
}
func (m *wsManager) Start() {
for {
select {
case conn := <-m.register:
m.clients[conn.id] = conn
conn.status = enum.ClientStatusOnline
logger.Info(conn.desc, "已连接", "当前在线客户端", len(m.clients))
data, _ := json.Marshal(&model.WsMessage{Type: enum.WsDataNotify, Data: "socket 初始化成功"})
conn.send <- data
}
}
}
func (m *wsManager) BoardCast() {
for {
select {
case message := <-m.broadcast:
for _, conn := range m.clients {
conn.send <- message
}
}
}
}
func (m *wsManager) Quit() {
for {
select {
case conn := <-m.unRegister:
if _, ok := m.clients[conn.id]; ok {
delete(m.clients, conn.id)
conn.conn.Close()
conn.status = enum.ClientStatusOffline
close(conn.isClosed)
}
}
}
}
func (m *wsManager) GetClient(id string) *wsClient {
if client, ok := m.clients[id]; ok {
return client
}
return nil
}
使用
一般为main.go或启动处
func RegisterHttpRoute(serverMux *gin.Engine) error {
go contorller.WsManager.Start()
go contorller.WsManager.Quit()
// 设置恐慌恢复,错误处理
serverMux.Use(middleware.PanicHandler(), middleware.ErrorHandler())
serverMux.NoMethod(middleware.NotFoundHandler())
serverMux.NoRoute(middleware.NotFoundHandler())
// 注册 websocket 接口
serverMux.GET("/socket", middleware.SocketAuth(), contorller.Socket.Ws)
...
}