171 lines
5.1 KiB
Go
171 lines
5.1 KiB
Go
package handler
|
||
|
||
import (
|
||
"encoding/json"
|
||
"errors"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/node"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/script_info"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/user"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/service/app_manage/common"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/service/app_manage/node/script"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket/transport"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log"
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/gorilla/websocket"
|
||
"sync"
|
||
)
|
||
|
||
// 控制台消息处理器
|
||
type NodeScriptHandler struct {
|
||
socket.BaseProxyHandler
|
||
}
|
||
|
||
// 自定义WebSocket中间件
|
||
func NodeScriptHandlerInterceptor(c *gin.Context) {
|
||
ch := NewNodeScriptHandler()
|
||
ch.AfterConnectionEstablished(c)
|
||
|
||
session, _ := c.Get("wsConn")
|
||
attributes, _ := c.Get("attributes")
|
||
|
||
err := ch.HandleTextMessage(session.(*websocket.Conn), attributes.(map[string]interface{}))
|
||
|
||
if err != nil {
|
||
s := err.Error()
|
||
|
||
//"websocket: close 1005 (no status)"
|
||
if s == "websocket: close 1005 (no status)" {
|
||
ch.AfterConnectionClosed(session.(*websocket.Conn), attributes.(map[string]interface{}))
|
||
} else {
|
||
ch.HandleTransportError(session.(*websocket.Conn), attributes.(map[string]interface{}), err)
|
||
}
|
||
return
|
||
}
|
||
|
||
}
|
||
|
||
/************生命周期方法************/
|
||
|
||
// NewConsoleHandler 创建控制台处理器实例
|
||
var nodeScriptHandler *NodeScriptHandler
|
||
var once1 sync.Once
|
||
|
||
func NewNodeScriptHandler() *NodeScriptHandler {
|
||
once1.Do(func() {
|
||
nodeScriptHandler = &NodeScriptHandler{
|
||
BaseProxyHandler: *socket.NewBaseProxyHandler(common.Script_Run),
|
||
}
|
||
})
|
||
return nodeScriptHandler
|
||
}
|
||
|
||
func (ch *NodeScriptHandler) AfterConnectionEstablished(c *gin.Context) {
|
||
ch.BaseProxyHandler.AfterConnectionEstablished(c, ch.GetParameters)
|
||
}
|
||
|
||
/************结束************/
|
||
|
||
// GetParameters 获取参数
|
||
func (ch *NodeScriptHandler) GetParameters(attributes map[string]interface{}) []interface{} {
|
||
dataItem := attributes["dataItem"]
|
||
nodeScriptCacheModel := dataItem.(script_info.NodeScriptCacheModel)
|
||
|
||
return []interface{}{"id", attributes["scriptId"], "workspaceId", nodeScriptCacheModel.WorkspaceId}
|
||
}
|
||
|
||
// HandleTextMessage 处理文本消息
|
||
func (ch *NodeScriptHandler) HandleTextMessage(session *websocket.Conn, attributes map[string]interface{}) error {
|
||
for {
|
||
_, message, err := session.ReadMessage()
|
||
if err != nil {
|
||
log.Errorf("session.ReadMessage() error: %s", err)
|
||
return err
|
||
}
|
||
var messageMap map[string]interface{}
|
||
err = json.Unmarshal(message, &messageMap)
|
||
if err != nil {
|
||
log.Errorf("json.Unmarshal(%s) error: %v", message, err)
|
||
}
|
||
proxySession, ok := attributes["proxySession"]
|
||
consoleCommandOpStr, _ := messageMap["op"].(string)
|
||
commandOp := socket.OfConsoleCommandOp(consoleCommandOpStr)
|
||
|
||
// 调用远程节点
|
||
var textMessage string
|
||
if ok {
|
||
webSocketClientHandler := proxySession.(*transport.WebSocketClientHandler)
|
||
|
||
if commandOp != socket.Heart {
|
||
ch.BaseHandler.LogOpt(attributes, messageMap)
|
||
}
|
||
|
||
if commandOp == socket.Start {
|
||
executeId, err := createLog(attributes)
|
||
if err != nil {
|
||
log.Errorf("创建日志失败:createLog(%s) error: %s", messageMap, err)
|
||
}
|
||
|
||
messageMap["executeId"] = executeId
|
||
messageMap["CFN_MSG"] = "CFN_MSG"
|
||
|
||
messageNew, _ := json.Marshal(messageMap)
|
||
err = webSocketClientHandler.Send(string(messageNew))
|
||
if err != nil {
|
||
log.Errorf("给逻辑节点发送消息失败:webSocketClientHandler.Send(%s) error: %s", messageNew, err)
|
||
}
|
||
}
|
||
|
||
}
|
||
if textMessage != "" {
|
||
ch.BaseHandler.SendMsg(session, textMessage)
|
||
}
|
||
}
|
||
}
|
||
|
||
func createLog(attributes map[string]interface{}) (string, error) {
|
||
nodeInfo, ok := attributes["nodeInfo"].(*node.Node)
|
||
if !ok {
|
||
return "", errors.New("nodeInfo is not of type NodeModel")
|
||
}
|
||
userModel, ok := attributes["userInfo"].(*user.UserObj)
|
||
if !ok {
|
||
return "", errors.New("userInfo is not of type UserModel")
|
||
}
|
||
dataItem, ok := attributes["dataItem"].(script_info.NodeScriptCacheModel)
|
||
if !ok {
|
||
return "", errors.New("dataItem is not of type NodeScriptCacheModel")
|
||
}
|
||
|
||
cacheModel := script_info.NewNodeScriptCacheModel()
|
||
cacheModel.Id = dataItem.Id
|
||
cacheModel.LastRunUser = userModel.UserID
|
||
_, err := cacheModel.Update(cacheModel, []string{"last_run_user"})
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
nodeScriptExecuteLogCacheModel := &script_info.NodeScriptExecuteLogCacheModel{
|
||
BaseNodeModel: model.BaseNodeModel{
|
||
NodeId: nodeInfo.Id,
|
||
NodeName: nodeInfo.Name,
|
||
BaseWorkspaceModel: model.BaseWorkspaceModel{
|
||
WorkspaceId: nodeInfo.WorkspaceId,
|
||
},
|
||
},
|
||
ScriptId: dataItem.ScriptId,
|
||
Name: dataItem.Name,
|
||
ScriptName: dataItem.Name,
|
||
TriggerExecType: 0,
|
||
}
|
||
|
||
// 每次插入都要检测,清理多余的日志,仅保留100条最新的
|
||
_, err = script.InsertNodeScriptExecuteLog(nodeScriptExecuteLogCacheModel, nodeInfo.WorkspaceId, userModel)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
return nodeScriptExecuteLogCacheModel.Id, nil
|
||
}
|