G

Golang 简易WS服务 - 服务端

Yuming 代码 2023-02-21

本次开发主要使用了Gorilla Websocket软件包

客户端结构体

维护socket连接,保存客户端信息

代码

package contorller

import (
    "encoding/json"
    "fmt"
    "git.myscrm.cn/vr/server-yk-app-builder/model"
    "git.myscrm.cn/vr/server-yk-app-builder/pkg/enum"
    "git.myscrm.cn/vr/server-yk-app-builder/pkg/logger"
    "git.myscrm.cn/vr/server-yk-app-builder/pkg/response"
    "github.com/gorilla/websocket"
    "strconv"
    "time"
)

const (
    // pingWait is the maximum time in seconds to wait for a ping from
    pingWait = 20 * time.Second
    // Maximum message size allowed from peer.
    maxMessageSize = 512
)

type wsClient struct {
    manager  *wsManager
    id       string
    desc     string
    conn     *websocket.Conn
    status   enum.ClientStatus
    send     chan []byte
    isClosed chan bool
    lastPing time.Time
}

func newWsClient(manager *wsManager, conn *websocket.Conn, id, name, ip string) *wsClient {
    return &wsClient{
        manager:  manager,
        id:       id,
        desc:     fmt.Sprintf("客户端(%s, %s)", name, ip),
        conn:     conn,
        status:   enum.ClientStatusOffline,
        send:     make(chan []byte),
        isClosed: make(chan bool),
        lastPing: time.Now(),
    }
}

// Read 读取客户端发送过来的消息
func (c *wsClient) Read() {
    defer func() {
        c.unRegister()
        logger.Info(c.desc, "read协程退出")
    }()

    c.conn.SetReadLimit(maxMessageSize)
    c.conn.SetPingHandler(func(text string) error {
        // 只需要知道客户端还活着就行,不需要回复
        c.lastPing = time.Now()
        // 更新客户端状态
        clientStatus, _ := strconv.ParseInt(text, 10, 32)
        c.status = enum.ClientStatus(int32(clientStatus))
        return nil
    })

    for {
        msgType, data, err := c.conn.ReadMessage()
        if err != nil {
            logger.Error(c.desc, "c.conn.ReadMessage", err.Error())
            break
        }

        switch msgType {
        case websocket.TextMessage:
            var msg *model.WsMessage
            err = json.Unmarshal(data, &msg)
            if err != nil {
                logger.Error(c.desc, "json.Unmarshal", err.Error())
                break
            }

            switch msg.Type {
            default:
                logger.Info(c.desc, "未知消息类型", string(data))
                c.send <- data
            }
        }
    }
}

// Write 把对应消息写回客户端
func (c *wsClient) Write() {
    defer func() {
        logger.Info(c.desc, "write协程退出")
        c.unRegister()
    }()
    for {
        select {
        case <-c.isClosed:
            return
        case msg := <-c.send:
            err := c.conn.WriteMessage(websocket.TextMessage, msg)
            if err != nil {
                logger.Error(c.desc, "c.conn.WriteMessage", err.Error())
                return
            }
        }
    }
}

// Check 检测客户端是否超时
func (c *wsClient) Check() {
    defer func() {
        logger.Info(c.desc, "check协程退出")
    }()
    ticker := time.NewTicker(pingWait / 6)
    for {
        select {
        case <-c.isClosed:
            return
        case <-ticker.C:
            // 主动关闭连接
            if time.Now().Sub(c.lastPing) > pingWait {
                response.WsReturnErr(c.conn, enum.WsDataErr, "客户端超时,主动关闭连接")
                logger.Info(c.desc, "客户端超时,主动关闭连接")
                return
            }
        }
    }
}

func (c *wsClient) unRegister() {
    if c.manager.clients[c.id] != nil {
        c.manager.unRegister <- c
    }
}

使用

应当在控制器/入口处

func (c *cSocket) Ws(ctx *gin.Context) {
    ws, err := c.upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
    if err != nil {
        return nil, "", err
    }

    ip := ctx.ClientIP()
    name := ctx.GetString("device_name")

    client := newWsClient(WsManager, ws, clientId, name, ip)
    client.manager.register <- client

    go client.Read()
    go client.Write()
    go client.Check()
}

客户端管理器

管理客户端,支持登录、注销、广播

代码

package contorller

import (
    "encoding/json"
    "git.myscrm.cn/vr/server-yk-app-builder/model"
    "git.myscrm.cn/vr/server-yk-app-builder/pkg/enum"
    "git.myscrm.cn/vr/server-yk-app-builder/pkg/logger"
)

var WsManager = NewWsManager()

type wsManager struct {
    clients    map[string]*wsClient // 记录在线用户
    broadcast  chan []byte          // 触发消息广播
    register   chan *wsClient       // 触发设备或用户登陆
    unRegister chan *wsClient       // 触发设备或用户退出
}

func NewWsManager() *wsManager {
    return &wsManager{
        clients:    make(map[string]*wsClient),
        broadcast:  make(chan []byte),
        register:   make(chan *wsClient),
        unRegister: make(chan *wsClient),
    }
}

func (m *wsManager) Start() {
    for {
        select {
        case conn := <-m.register:
            m.clients[conn.id] = conn
            conn.status = enum.ClientStatusOnline
            logger.Info(conn.desc, "已连接", "当前在线客户端", len(m.clients))
            data, _ := json.Marshal(&model.WsMessage{Type: enum.WsDataNotify, Data: "socket 初始化成功"})
            conn.send <- data
        }
    }
}

func (m *wsManager) BoardCast() {
    for {
        select {
        case message := <-m.broadcast:
            for _, conn := range m.clients {
                conn.send <- message
            }
        }
    }
}

func (m *wsManager) Quit() {
    for {
        select {
        case conn := <-m.unRegister:
            if _, ok := m.clients[conn.id]; ok {
                delete(m.clients, conn.id)
                conn.conn.Close()
                conn.status = enum.ClientStatusOffline
                close(conn.isClosed)
            }
        }
    }
}

func (m *wsManager) GetClient(id string) *wsClient {
    if client, ok := m.clients[id]; ok {
        return client
    }
    return nil
}

使用

一般为main.go或启动处

func RegisterHttpRoute(serverMux *gin.Engine) error {
    go contorller.WsManager.Start()
    go contorller.WsManager.Quit()

    // 设置恐慌恢复,错误处理
    serverMux.Use(middleware.PanicHandler(), middleware.ErrorHandler())
    serverMux.NoMethod(middleware.NotFoundHandler())
    serverMux.NoRoute(middleware.NotFoundHandler())

    // 注册 websocket 接口
    serverMux.GET("/socket", middleware.SocketAuth(), contorller.Socket.Ws)

    ...
}
PREV
Golang 协程更好的进行错误处理
NEXT
Golang 简易WS服务 - 客户端

评论(0)

评论已关闭