package handler import ( "encoding/json" "git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/service/app_manage/common" "git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket" "git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket/transport" "git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "sync" ) // 控制台消息处理器 type ConsoleHandler struct { socket.BaseProxyHandler } // 自定义WebSocket中间件 func ConsoleHandlerInterceptor(c *gin.Context) { ch := NewConsoleHandler() ch.AfterConnectionEstablished(c) session, _ := c.Get("wsConn") attributes, _ := c.Get("attributes") err := ch.HandleTextMessage(session.(*websocket.Conn), attributes.(map[string]interface{})) if err != nil { s := err.Error() //"websocket: close 1005 (no status)" if s == "websocket: close 1005 (no status)" { ch.AfterConnectionClosed(session.(*websocket.Conn), attributes.(map[string]interface{})) } else { ch.HandleTransportError(session.(*websocket.Conn), attributes.(map[string]interface{}), err) } return } } /************生命周期方法************/ // NewConsoleHandler 创建控制台处理器实例 var consoleHandler *ConsoleHandler var once sync.Once func NewConsoleHandler() *ConsoleHandler { once.Do(func() { consoleHandler = &ConsoleHandler{ BaseProxyHandler: *socket.NewBaseProxyHandler(common.TopSocket), } }) return consoleHandler } func (ch *ConsoleHandler) AfterConnectionEstablished(c *gin.Context) { ch.BaseProxyHandler.AfterConnectionEstablished(c, ch.GetParameters) } func (ch *ConsoleHandler) HandleTransportError(session *websocket.Conn, attributes map[string]interface{}, err error) { ch.BaseProxyHandler.HandleTransportError(session, attributes, err) } // 连接关闭处理 func (ch *ConsoleHandler) AfterConnectionClosed(session *websocket.Conn, attributes map[string]interface{}) { ch.BaseProxyHandler.AfterConnectionClosed(session, attributes) } /************结束************/ // GetParameters 获取参数 func (ch *ConsoleHandler) GetParameters(attributes map[string]interface{}) []interface{} { return []interface{}{"projectId", attributes["projectId"]} } // HandleTextMessage 处理文本消息 func (ch *ConsoleHandler) HandleTextMessage(session *websocket.Conn, attributes map[string]interface{}) error { for { _, message, err := session.ReadMessage() if err != nil { log.Errorf("session.ReadMessage() error: %s", err) //ch.SendMsg(session, "处理消息失败!") return err } var messageMap map[string]interface{} err = json.Unmarshal(message, &messageMap) if err != nil { log.Errorf("json.Unmarshal(%s) error: %v", message, err) //ch.SendMsg(session, "处理消息失败!") return err } proxySession, ok := attributes["proxySession"] consoleCommandOpStr, _ := messageMap["op"].(string) commandOp := socket.OfConsoleCommandOp(consoleCommandOpStr) // 调用远程节点 var textMessage string if ok { //textMessage = ws.handleTextMessage(attributes, proxySession, jsonObj, consoleCommandOp) webSocketClientHandler := proxySession.(*transport.WebSocketClientHandler) commandOps := []socket.ConsoleCommandOp{socket.Heart, socket.Showlog} // 判断commandOps是否包含commandOp contain := false for _, op := range commandOps { if op == commandOp { contain = true } } if !contain { ch.BaseHandler.LogOpt(attributes, messageMap) } err = webSocketClientHandler.Send(string(message)) if err != nil { log.Errorf("给逻辑节点发送消息失败:webSocketClientHandler.Send(%s) error: %s", message, err) //ch.SendMsg(session, "给逻辑节点发送消息失败!") //return err } } if textMessage != "" { ch.BaseHandler.SendMsg(session, textMessage) } } } // getClass 获取类信息 func (ch *ConsoleHandler) getClass() string { return "ConsoleHandler" }