liuhaijun e94826ce29 add server
Change-Id: I0760f17f6a01c0121b59fcbfafc666032dbc30af
2024-09-19 09:44:15 +00:00

446 lines
12 KiB
Go

package service
import (
"bytes"
"encoding/json"
"fmt"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/config"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/data"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/agent"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/node"
devc "git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/device"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/message"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/utils"
"github.com/nats-io/nats.go"
"github.com/robfig/cron/v3"
"strconv"
"strings"
"sync"
"time"
)
var agents []agent.AgentInfo
func InitDevice() {
go func() {
log.Info("开始监听消息队列..." + message.TO_SCHEDULE_BROADCAST_SUBJECT)
data.Nc.QueueSubscribe(message.TO_SCHEDULE_BROADCAST_SUBJECT, "schedule", func(msg *nats.Msg) {
res, err := message.UnmarshalMsgModel(msg.Data)
if err != nil {
log.Errorf("format message fail: %s", string(msg.Data))
}
if res.Func == message.FUNC_HEALTHCHECK {
Healthz(res)
} else if res.Func == message.FUNC_ASSETINFO {
CheckDeviceInfo(res)
} else if res.Func == message.FUNC_AGENTUPGRADERESULT {
UpgradeRecord(res)
} else if res.Func == message.FUNC_POWER_ON_OFF {
SaveDeviceOperateLog(res)
}
})
}()
go func() {
log.Info("开始监听消息队列..." + message.TO_SCHEDULE_QUEUE_SUBJECT)
data.Nc.QueueSubscribe(message.TO_SCHEDULE_QUEUE_SUBJECT, "schedule", func(msg *nats.Msg) {
res, err := message.UnmarshalMsgModel(msg.Data)
if err != nil {
log.Errorf("format message fail: %s", string(msg.Data))
}
switch res.Func {
case message.FUNC_GETCOMPANY:
ret, _ := GetDeviceCompany(res).Marshal()
msg.Respond(ret)
break
}
})
}()
// for cfn
if config.Config.CfnConfig.Enable {
if config.Config.CfnConfig.AgnetSilentUpgrade {
aget := &agent.AgentInfo{}
agents, _ = aget.ListLastVersion()
}
go func() {
c := cron.New()
// 每分钟执行一次
_, err := c.AddFunc("* * * * *", func() {
now := time.Now()
var oneMinuteAgo = now.Add(-time.Minute)
timeStr := oneMinuteAgo.Format("200601021504")
count := GetDeviceOnlineCount(timeStr)
deviceOnlineCount := devc.DeviceOnlineCount{
Id: timeStr,
Count: count,
UpdateTime: now,
}
_, err := deviceOnlineCount.CreateOrUpdate()
if err != nil {
log.Errorf("failed to save device online count, err: " + err.Error())
}
// 更新最新agent列表
if config.Config.CfnConfig.AgnetSilentUpgrade {
aget := &agent.AgentInfo{}
agents, _ = aget.ListLastVersion()
}
})
if err != nil {
log.Errorf("failed to create cron job to save device online count, err: " + err.Error())
}
c.Start()
}()
}
go func() {
c := cron.New()
_, err := c.AddFunc("*/10 * * * *", func() {
now := time.Now()
var tenMinuteAgo = now.Add(-10 * time.Minute)
tenMinuteAgoStr := tenMinuteAgo.Format("200601021504")
timeStr := tenMinuteAgoStr[0 : len(tenMinuteAgoStr)-1]
runTime := GetDeviceRunTime(timeStr)
if runTime == nil {
return
}
runTime.Range(func(key, value any) bool {
d := devc.DeviceRunTime{
Date: tenMinuteAgo.Format("20060102"),
SerialNo: key.(string),
RunTime: value.(*RunTimeStruct).RunTime,
UpdateTime: now,
}
_, err := d.UpInsert()
if err != nil {
log.Errorf("failed to save device run time, err: " + err.Error())
}
return true
})
})
if err != nil {
log.Errorf("failed to create cron job to save device online count, err: " + err.Error())
}
c.Start()
}()
}
func Healthz(msg message.MsgModel) {
device := &devc.DeviceInfo{}
err := json.Unmarshal(msg.Body, device)
if device.SerialNo == "" {
if msg.Rid != "" {
device.SerialNo = msg.Rid
} else {
log.Errorf("SerialNo is nil")
return
}
}
SaveDeviceOnline(device.SerialNo)
SaveDeviceRunTime(device.SerialNo)
os := strings.ToLower(device.OsInfo)
arch := strings.ToLower(device.ArchType)
if strings.Index(os, "centos") >= 0 {
device.OsType = agent.OS_LINUX_CENTOS
} else if strings.Index(os, "ubuntu") >= 0 {
device.OsType = agent.OS_LINUX_UBUNTU
} else if strings.Index(os, "kylin") >= 0 {
device.OsType = agent.OS_LINUX_KylinOS
}
if strings.Index(arch, "x86_64") >= 0 || strings.Index(arch, "x86-64") >= 0 {
device.ArchType = "X86_64"
} else if strings.Index(arch, "arm") >= 0 {
device.ArchType = "ARM"
}
if err != nil {
log.Errorf("Healthz json Unmarshal fail: %s", string(msg.Body))
return
}
device.SaveHealth()
go SyncMachineAndLogicNode(device)
go CheckAgnetSilentUpgrade(device)
}
var Cooldown = make(map[string]bool)
// CheckAgnetSilentUpgrade 检查是否需要静默升级
// 注意避免重复发送(小助手接收到升级命令后,有一段时间间隔)【增加一个名单,如果发送了进入这个名单,等升级后再退出】
// 通知管理员手动升级
// 后台静默升级,并通知给管理员结果
func CheckAgnetSilentUpgrade(device *devc.DeviceInfo) {
if config.Config.CfnConfig.Enable && config.Config.CfnConfig.AgnetSilentUpgrade {
for _, latestVersion := range agents {
if latestVersion.ArchType == device.ArchType && latestVersion.OsType == device.OsType {
// 如果最新版本大于当前版本,则需要通知小助手立即升级。
// TODO 通知管理员手动升级
if compareVersions(latestVersion.Version, device.AgentVersion) > 0 {
log.Info("小助手自动升级开启!" + device.SerialNo)
_, exists := Cooldown[device.SerialNo]
if exists {
continue
}
plan, err := GetUpgradeCommandNow(&latestVersion)
if err != nil {
log.Info("未找到小助手包!")
return
}
body, _ := json.Marshal(plan)
dat := message.MsgModel{
Body: body,
Func: message.FUNC_UPGRADEIMMEDIATELY,
Rid: device.SerialNo,
Version: "v1",
}
payload, _ := json.Marshal(dat)
sub := strings.ReplaceAll(message.TO_AGENT_UNICAST_SUBJECT, "{rid}", device.SerialNo)
msg, err := data.Nc.Request(sub, payload, 6*time.Second)
if err != nil {
log.Errorf("内部错误,下发小助手升级通知失败, err: " + err.Error())
return
}
res, err := message.UnmarshalMsgRespModel(msg.Data)
if err != nil {
log.Errorf("响应消息格式错误,下发小助手升级通知失败!, err: " + err.Error())
return
}
if res.Code != 200 {
log.Errorf("下发小助手升级通知失败!")
return
}
log.Info("已通知小助手升级,自动升级命令已下达!" + device.SerialNo)
SendSiteMessage(device)
} else {
_, exists := Cooldown[device.SerialNo]
if exists {
delete(Cooldown, device.SerialNo)
}
}
}
}
}
}
func SendSiteMessage(device *devc.DeviceInfo) {
http := utils.HttpRequest{}
var form = map[string]interface{}{
"infoMap": map[string]string{
"serialno": device.SerialNo,
},
"oneLevelCategory": "asset_msg",
"twoLevelCategory": 26,
}
body, _ := json.Marshal(form)
var header = map[string]string{
"Content-Type": "application/json",
"Authorization": config.Config.SchemeService.Token,
}
var resp map[string]any
err := http.Request("POST", config.Config.SchemeService.BaseUrl+"/cpn/api/plt/v1/msgSub/sendMsg", bytes.NewReader(body), header).ParseJson(&resp)
if err != nil {
log.Errorf("发送站内信失败!")
} else {
log.Infof("发送站内信成功!")
}
}
// compareVersions 比较两个版本号的前三个部分
func compareVersions(version1, version2 string) int {
// 分割版本号字符串
v1Parts := strings.Split(version1, ".")
v2Parts := strings.Split(version2, ".")
// 只取前三个部分
v1Parts = v1Parts[:3]
v2Parts = v2Parts[:3]
// 比较每个部分
for i := 0; i < 3; i++ {
v1Num, _ := strconv.Atoi(v1Parts[i])
v2Num, _ := strconv.Atoi(v2Parts[i])
if v1Num > v2Num {
return 1
} else if v1Num < v2Num {
return -1
}
}
// 如果所有部分都相同
return 0
}
// SyncMachineAndLogicNode 同步机器和逻辑节点信息
func SyncMachineAndLogicNode(device *devc.DeviceInfo) {
// todo 需提示用户分配工作空间(在算网需设计一个开启应用部署的管理功能或者通过序列号可查工作空间、企业) 判断是否有用户名密码等信息
if device.AgentName != "" && device.AgentPwd != "" && device.IPAddress != "" && device.Port != 0 {
cfnAgentUrl := fmt.Sprintf("%s:%d", device.IPAddress, device.Port)
machine := node.NewMachine()
exist, _ := machine.GetMachineNodeByCfnAgentUrl(cfnAgentUrl)
if len(exist.Id) > 0 {
if exist.CfnAgentPassword != device.AgentPwd {
exist.UpdateMachineNodeCfnAgentPassword(cfnAgentUrl, device.AgentPwd)
// 更新逻辑节点
node.UpdateNodeLoginPwdByMachineId(exist.Id, device.AgentPwd)
}
return
}
//machine.Id = ""
machine.Name = device.SerialNo
machine.TransportMode = 0
machine.CfnAgentUrl = cfnAgentUrl
machine.CfnAgentUsername = device.AgentName
machine.CfnAgentPassword = device.AgentPwd
machine.CfnAgentProtocol = "http"
machine.CfnAgentTimeout = 0
machine.CfnAgentVersion = device.AgentVersion
machine.SetUserName("admin")
machine.Create(machine)
}
}
func CheckDeviceInfo(msg message.MsgModel) {
device := &devc.DeviceInfo{}
err := json.Unmarshal(msg.Body, device)
if device.SerialNo == "" {
if msg.Rid != "" {
device.SerialNo = msg.Rid
} else {
log.Errorf("SerialNo is nil")
return
}
}
os := strings.ToLower(device.OsInfo)
arch := strings.ToLower(device.ArchType)
if strings.Index(os, "centos") >= 0 {
device.OsType = agent.OS_LINUX_CENTOS
} else if strings.Index(os, "ubuntu") >= 0 {
device.OsType = agent.OS_LINUX_UBUNTU
} else if strings.Index(os, "kylin") >= 0 {
device.OsType = agent.OS_LINUX_KylinOS
}
if strings.Index(arch, "x86_64") >= 0 || strings.Index(arch, "x86-64") >= 0 {
device.ArchType = "X86_64"
} else if strings.Index(arch, "arm") >= 0 {
device.ArchType = "ARM"
}
if err != nil {
log.Errorf("CheckDeviceInfo json Unmarshal fail: %s", string(msg.Body))
return
}
device.SaveBase()
}
func UpgradeRecord(msg message.MsgModel) {
record := &devc.DeviceUpgradeRecord{}
err := json.Unmarshal(msg.Body, record)
if err != nil {
log.Errorf("UpgradeRecord json Unmarshal fail: %s", string(msg.Body))
return
}
record.Create()
}
func SaveDeviceOperateLog(msg message.MsgModel) {
deviceOperateLog := &devc.DeviceOperateLog{}
err := json.Unmarshal(msg.Body, deviceOperateLog)
if err != nil {
log.Errorf("UpgradeRecord json Unmarshal fail: %s", string(msg.Body))
return
}
deviceOperateLog.Create()
}
// 查询设备公司信息,理论应该主动发,但调度里拿不到更改信息
func GetDeviceCompany(msg message.MsgModel) (ret *message.MsgRespModel) {
device := &devc.AssetDeviceInfo{}
fields := map[string]interface{}{}
fields["serial_no"] = msg.Rid
asset, err := device.Get(fields)
if err != nil {
ret = message.NewMsgRespModel().Fail(err.Error())
return
}
res := map[string]interface{}{}
res["companyId"] = asset.CompanyId
res["assetType"] = asset.AssetType
if asset.CompanyId == "" {
ret = message.NewMsgRespModel().Fail("companyId is nil")
return
}
ret = message.NewMsgRespModel().Success(res)
return
}
var deviceOnline = sync.Map{}
func SaveDeviceOnline(serialNo string) {
timeStr := time.Now().Format("200601021504")
value, ok := deviceOnline.Load(timeStr)
if !ok {
value, _ = deviceOnline.LoadOrStore(timeStr, make(map[string]bool))
}
deviceMap := value.(map[string]bool)
deviceMap[serialNo] = true
}
func GetDeviceOnlineCount(timeStr string) int {
value, ok := deviceOnline.LoadAndDelete(timeStr)
if ok {
deviceMap := value.(map[string]bool)
return len(deviceMap)
}
return 0
}
var deviceRunTime = sync.Map{}
type RunTimeStruct struct {
LastUpdateTime int64
RunTime int64
}
func SaveDeviceRunTime(serialNo string) {
timeStr := time.Now().Format("2006010215")
timeStr = timeStr[0 : len(timeStr)-1]
value, ok := deviceOnline.Load(timeStr)
if !ok {
value, _ = deviceOnline.LoadOrStore(timeStr, &sync.Map{})
}
deviceMap := value.(*sync.Map)
obj, ok := deviceMap.Load(serialNo)
if !ok {
obj, _ = deviceMap.LoadOrStore(serialNo, &RunTimeStruct{})
}
runTimeStruct := obj.(*RunTimeStruct)
now := time.Now().UnixMilli()
lastUpdateTime := runTimeStruct.LastUpdateTime
interval := now - lastUpdateTime
if interval <= 90*1000 {
runTimeStruct.RunTime += interval
}
lastUpdateTime = now
}
func GetDeviceRunTime(timeStr string) *sync.Map {
value, ok := deviceOnline.LoadAndDelete(timeStr)
if ok {
return value.(*sync.Map)
}
return nil
}