1374 lines
41 KiB
Go
Executable File
1374 lines
41 KiB
Go
Executable File
package utils
|
|
|
|
// Author: Weisen Pan
|
|
// Date: 2023-10-24
|
|
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"k8s.io/apimachinery/knets_pkg/runtime"
|
|
"k8s.io/apimachinery/knets_pkg/util/rand"
|
|
"k8s.io/client-go/kubernetes"
|
|
externalclientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/kubernetes/knets_pkg/apis/core/v1"
|
|
metav1 "k8s.io/apimachinery/knets_pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/knets_pkg/runtime/schema"
|
|
"k8s.io/apimachinery/knets_pkg/util/uuid"
|
|
|
|
"github.com/pquerna/ffjson/ffjson"
|
|
"helm.sh/helm/v3/knets_pkg/releaseutil"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/knets_pkg/api/resource"
|
|
resourcehelper "k8s.io/kubectl/knets_pkg/util/resource"
|
|
api "k8s.io/kubernetes/knets_pkg/apis/core"
|
|
apiv1 "k8s.io/kubernetes/knets_pkg/apis/core/v1"
|
|
"k8s.io/kubernetes/knets_pkg/apis/core/validation"
|
|
"k8s.io/kubernetes/knets_pkg/controller/daemon"
|
|
"k8s.io/kubernetes/knets_pkg/scheduler/framework"
|
|
schedulerutil "k8s.io/kubernetes/knets_pkg/scheduler/util"
|
|
|
|
localcache "github.com/alibaba/open-local/knets_pkg/scheduler/algorithm/cache"
|
|
localutils "github.com/alibaba/open-local/knets_pkg/utils"
|
|
simontype "github.com/hkust-adsl/kubernetes-scheduler-simulator/knets_pkg/type"
|
|
gpusharecache "github.com/hkust-adsl/kubernetes-scheduler-simulator/knets_pkg/type/open-gpu-share/cache"
|
|
"github.com/hkust-adsl/kubernetes-scheduler-simulator/knets_pkg/type/open-gpu-share/utils"
|
|
gpushareutils "github.com/hkust-adsl/kubernetes-scheduler-simulator/knets_pkg/type/open-gpu-share/utils"
|
|
)
|
|
|
|
var nameDelimiter = "/"
|
|
|
|
func GeneratePodKey(pod *v1.Pod) string {
|
|
return GeneratePodKeyByName(pod.Namespace, pod.Name)
|
|
}
|
|
|
|
func GeneratePodKeyByName(namespace, name string) string {
|
|
return namespace + nameDelimiter + name
|
|
}
|
|
|
|
// ParseFilePath converts recursively directory path to a slice of file paths
|
|
func ParseFilePath(path string) (filePaths []string, err error) {
|
|
fi, err := os.Stat(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse path(%s): %v ", path, err)
|
|
}
|
|
|
|
switch mode := fi.Mode(); {
|
|
case mode.IsDir():
|
|
files, err := ioutil.ReadDir(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read directory(%s): %v ", path, err)
|
|
}
|
|
for _, f := range files {
|
|
p := filepath.Join(path, f.Name())
|
|
subFiles, err := ParseFilePath(p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
filePaths = append(filePaths, subFiles...)
|
|
}
|
|
case mode.IsRegular():
|
|
filePaths = append(filePaths, path)
|
|
default:
|
|
return nil, fmt.Errorf("invalid path: %s ", path)
|
|
}
|
|
|
|
return filePaths, nil
|
|
}
|
|
|
|
// DecodeYamlContent captures the yml or yaml file, and decodes it
|
|
func DecodeYamlContent(yamlRes []byte) ([]runtime.Object, error) {
|
|
objects := make([]runtime.Object, 0)
|
|
yamls := releaseutil.SplitManifests(string(yamlRes))
|
|
for _, yaml := range yamls {
|
|
decode := scheme.Codecs.UniversalDeserializer().Decode
|
|
obj, _, err := decode([]byte(yaml), nil, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode yaml content: \n%s\n%v", yaml, err)
|
|
}
|
|
|
|
objects = append(objects, obj)
|
|
}
|
|
|
|
return objects, nil
|
|
}
|
|
|
|
func ReadYamlFile(path string) []byte {
|
|
fileExtension := filepath.Ext(path)
|
|
if fileExtension != ".yaml" && fileExtension != ".yml" {
|
|
return nil
|
|
}
|
|
yamlFile, err := ioutil.ReadFile(path)
|
|
if err != nil {
|
|
log.Errorf("Error while read file %s: %s\n", path, err.Error())
|
|
os.Exit(1)
|
|
}
|
|
return yamlFile
|
|
}
|
|
|
|
func ReadJsonFile(path string) []byte {
|
|
pathExtension := filepath.Ext(path)
|
|
if pathExtension != ".json" {
|
|
return nil
|
|
}
|
|
jsonFile, err := ioutil.ReadFile(path)
|
|
if err != nil {
|
|
log.Errorf("Error while read file %s: %s\n", path, err.Error())
|
|
os.Exit(1)
|
|
}
|
|
|
|
return jsonFile
|
|
}
|
|
|
|
// GetYamlContentFromDirectory returns the yaml content and ignores other content
|
|
func GetYamlContentFromDirectory(dir string) ([]string, error) {
|
|
var ymlStr []string
|
|
filePaths, err := ParseFilePath(dir)
|
|
if err != nil {
|
|
return ymlStr, err
|
|
}
|
|
for _, filePath := range filePaths {
|
|
if yml := ReadYamlFile(filePath); yml != nil {
|
|
ymlStr = append(ymlStr, string(yml))
|
|
}
|
|
}
|
|
|
|
return ymlStr, nil
|
|
}
|
|
|
|
func MakeValidPodsByDeployment(deploy *appsv1.Deployment) ([]*v1.Pod, error) {
|
|
deploy.UID = uuid.NewUUID()
|
|
return MakeValidPodsByReplicaSet(generateReplicaSetFromDeployment(deploy))
|
|
}
|
|
|
|
func MakeValidPodsByReplicaSet(rs *appsv1.ReplicaSet) ([]*v1.Pod, error) {
|
|
var pods []*v1.Pod
|
|
if rs.UID == "" {
|
|
rs.UID = uuid.NewUUID()
|
|
}
|
|
if rs.Spec.Replicas == nil {
|
|
var replica int32 = 1
|
|
rs.Spec.Replicas = &replica
|
|
}
|
|
for ordinal := 0; ordinal < int(*rs.Spec.Replicas); ordinal++ {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: SetObjectMetaFromObject(rs, true),
|
|
Spec: rs.Spec.Template.Spec,
|
|
}
|
|
validPod, err := MakeValidPod(pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
validPod = AddWorkloadInfoToPod(validPod, simontype.ReplicaSet, rs.Name, rs.Namespace)
|
|
pods = append(pods, validPod)
|
|
}
|
|
return pods, nil
|
|
}
|
|
|
|
func MakeValidPodsByReplicationController(rc *v1.ReplicationController) ([]*v1.Pod, error) {
|
|
var pods []*v1.Pod
|
|
rc.UID = uuid.NewUUID()
|
|
if rc.Spec.Replicas == nil {
|
|
var replica int32 = 1
|
|
rc.Spec.Replicas = &replica
|
|
}
|
|
|
|
for ordinal := 0; ordinal < int(*rc.Spec.Replicas); ordinal++ {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: SetObjectMetaFromObject(rc, true),
|
|
Spec: rc.Spec.Template.Spec,
|
|
}
|
|
validPod, err := MakeValidPod(pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
validPod = AddWorkloadInfoToPod(validPod, simontype.ReplicationController, rc.Name, rc.Namespace)
|
|
pods = append(pods, validPod)
|
|
}
|
|
return pods, nil
|
|
}
|
|
|
|
func GenerateReplicaSetFromDeployment(deploy *appsv1.Deployment) *appsv1.ReplicaSet {
|
|
return &appsv1.ReplicaSet{
|
|
TypeMeta: metav1.TypeMeta{
|
|
APIVersion: appsv1.SchemeGroupVersion.String(),
|
|
Kind: simontype.ReplicaSet,
|
|
},
|
|
ObjectMeta: SetObjectMetaFromObject(deploy, false),
|
|
Spec: appsv1.ReplicaSetSpec{
|
|
Selector: deploy.Spec.Selector,
|
|
Replicas: deploy.Spec.Replicas,
|
|
Template: deploy.Spec.Template,
|
|
},
|
|
}
|
|
}
|
|
|
|
func MakeValidPodsByCronJob(cronJob *batchv1beta1.CronJob) ([]*corev1.Pod, error) {
|
|
cronJob.UID = uuid.NewUUID()
|
|
return MakeValidPodsByJob(GenerateJobFromCronJob(cronJob))
|
|
}
|
|
|
|
func MakeValidPodsByJob(job *batchv1.Job) ([]*corev1.Pod, error) {
|
|
var pods []*corev1.Pod
|
|
|
|
if job.UID == "" {
|
|
job.UID = uuid.NewUUID()
|
|
}
|
|
|
|
if job.Spec.Completions == nil {
|
|
var completions int32 = 1
|
|
job.Spec.Completions = &completions
|
|
}
|
|
|
|
for ordinal := 0; ordinal < int(*job.Spec.Completions); ordinal++ {
|
|
pod := &corev1.Pod{
|
|
ObjectMeta: SetObjectMetaFromObject(job, true),
|
|
Spec: job.Spec.Template.Spec,
|
|
}
|
|
validPod, err := MakeValidPod(pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
validPod = AddWorkloadInfoToPod(validPod, simontype.Job, job.Name, job.Namespace)
|
|
pods = append(pods, validPod)
|
|
}
|
|
|
|
return pods, nil
|
|
}
|
|
|
|
func GenerateJobFromCronJob(cronJob *batchv1beta1.CronJob) *batchv1.Job {
|
|
annotations := make(map[string]string)
|
|
annotations["cronjob.kubernetes.io/instantiate"] = "manual"
|
|
|
|
for k, v := range cronJob.Spec.JobTemplate.Annotations {
|
|
annotations[k] = v
|
|
}
|
|
|
|
return &batchv1.Job{
|
|
TypeMeta: metav1.TypeMeta{
|
|
APIVersion: batchv1.SchemeGroupVersion.String(),
|
|
Kind: simontype.Job,
|
|
},
|
|
ObjectMeta: SetObjectMetaFromObject(cronJob, false),
|
|
Spec: cronJob.Spec.JobTemplate.Spec,
|
|
}
|
|
}
|
|
|
|
func MakeValidPodsByStatefulSet(ss *appsv1.StatefulSet) ([]*corev1.Pod, error) {
|
|
var pods []*corev1.Pod
|
|
ss.UID = uuid.NewUUID()
|
|
|
|
if ss.Spec.Replicas == nil {
|
|
var replica int32 = 1
|
|
ss.Spec.Replicas = &replica
|
|
}
|
|
|
|
for ordinal := 0; ordinal < int(*ss.Spec.Replicas); ordinal++ {
|
|
pod := &corev1.Pod{
|
|
ObjectMeta: SetObjectMetaFromObject(ss, true),
|
|
Spec: ss.Spec.Template.Spec,
|
|
}
|
|
validPod, err := MakeValidPod(pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
validPod.Name = fmt.Sprintf("%s-%d", ss.Name, ordinal)
|
|
validPod = AddWorkloadInfoToPod(validPod, simontype.StatefulSet, ss.Name, ss.Namespace)
|
|
pods = append(pods, validPod)
|
|
}
|
|
|
|
if err := SetStorageAnnotationOnPods(pods, ss.Spec.VolumeClaimTemplates, ss.Name); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return pods, nil
|
|
}
|
|
|
|
func SetStorageAnnotationOnPods(pods []*corev1.Pod, volumeClaimTemplates []corev1.PersistentVolumeClaim, stsName string) error {
|
|
var volumes VolumeRequest
|
|
volumes.Volumes = make([]Volume, 0)
|
|
|
|
for _, pvc := range volumeClaimTemplates {
|
|
if pvc.Spec.StorageClassName != nil {
|
|
storageClassName := *pvc.Spec.StorageClassName
|
|
var volumeKind string
|
|
|
|
switch storageClassName {
|
|
case OpenLocalSCNameLVM, YodaSCNameLVM:
|
|
volumeKind = "LVM"
|
|
case OpenLocalSCNameDeviceSSD, OpenLocalSCNameMountPointSSD, YodaSCNameMountPointSSD, YodaSCNameDeviceSSD:
|
|
volumeKind = "SSD"
|
|
case OpenLocalSCNameDeviceHDD, OpenLocalSCNameMountPointHDD, YodaSCNameMountPointHDD, YodaSCNameDeviceHDD:
|
|
volumeKind = "HDD"
|
|
default:
|
|
log.Errorf("unsupported storage class: %s", storageClassName)
|
|
continue
|
|
}
|
|
|
|
volume := Volume{
|
|
Size: localutils.GetPVCRequested(&pvc),
|
|
Kind: volumeKind,
|
|
StorageClassName: storageClassName,
|
|
}
|
|
volumes.Volumes = append(volumes.Volumes, volume)
|
|
} else {
|
|
log.Errorf("empty storageClassName in volumeTemplate of statefulset %s is not supported", stsName)
|
|
}
|
|
}
|
|
|
|
for _, pod := range pods {
|
|
b, err := json.Marshal(volumes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
metav1.SetMetaDataAnnotation(&pod.ObjectMeta, simontype.AnnoPodLocalStorage, string(b))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func SetObjectMetaFromObject(owner metav1.Object, genPod bool) metav1.ObjectMeta {
|
|
var controllerKind schema.GroupVersionKind
|
|
|
|
switch owner.(type) {
|
|
case *appsv1.Deployment:
|
|
controllerKind = appsv1.SchemeGroupVersion.WithKind(simontype.Deployment)
|
|
case *appsv1.ReplicaSet:
|
|
controllerKind = appsv1.SchemeGroupVersion.WithKind(simontype.ReplicaSet)
|
|
case *appsv1.StatefulSet:
|
|
controllerKind = appsv1.SchemeGroupVersion.WithKind(simontype.StatefulSet)
|
|
case *appsv1.DaemonSet:
|
|
controllerKind = appsv1.SchemeGroupVersion.WithKind(simontype.DaemonSet)
|
|
case *corev1.ReplicationController:
|
|
controllerKind = corev1.SchemeGroupVersion.WithKind(simontype.ReplicationController)
|
|
case *batchv1beta1.CronJob:
|
|
controllerKind = batchv1beta1.SchemeGroupVersion.WithKind(simontype.CronJob)
|
|
case *batchv1.Job:
|
|
controllerKind = batchv1.SchemeGroupVersion.WithKind(simontype.Job)
|
|
}
|
|
|
|
return metav1.ObjectMeta{
|
|
Name: owner.GetName() + simontype.SeparateSymbol + GetSHA256HashCode([]byte(rand.String(10)), GetObjectHashCodeDigit(genPod)),
|
|
Namespace: owner.GetNamespace(),
|
|
UID: uuid.NewUUID(),
|
|
Annotations: owner.GetAnnotations(),
|
|
GenerateName: owner.GetName(),
|
|
Labels: owner.GetLabels(),
|
|
OwnerReferences: []metav1.OwnerReference{
|
|
*metav1.NewControllerRef(owner, controllerKind),
|
|
},
|
|
}
|
|
}
|
|
|
|
func GetObjectHashCodeDigit(isPod bool) int {
|
|
if isPod {
|
|
return simontype.PodHashCodeDigit
|
|
}
|
|
return simontype.WorkLoadHashCodeDigit
|
|
}
|
|
|
|
func MakeValidPodsByDaemonSet(ds *appsv1.DaemonSet, nodes []*corev1.Node) ([]*corev1.Pod, error) {
|
|
var pods []*corev1.Pod
|
|
ds.UID = uuid.NewUUID()
|
|
|
|
for _, node := range nodes {
|
|
pod, err := NewDaemonPod(ds, node.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
shouldRun := NodeShouldRunPod(node, pod)
|
|
if shouldRun {
|
|
pods = append(pods, pod)
|
|
}
|
|
}
|
|
|
|
return pods, nil
|
|
}
|
|
func NewDaemonPod(ds *appsv1.DaemonSet, nodeName string) (*corev1.Pod, error) {
|
|
// Create a new Pod based on the DaemonSet and node information
|
|
pod := &corev1.Pod{
|
|
ObjectMeta: SetObjectMetaFromObject(ds, true),
|
|
Spec: ds.Spec.Template.Spec,
|
|
}
|
|
|
|
// Set node affinity for the Pod
|
|
pod.Spec.Affinity = SetDaemonSetPodNodeNameByNodeAffinity(pod.Spec.Affinity, nodeName)
|
|
|
|
// Make the Pod valid
|
|
validPod, err := MakeValidPod(pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Add workload information to the Pod
|
|
validPod = AddWorkloadInfoToPod(validPod, simontype.DaemonSet, ds.Name, ds.Namespace)
|
|
|
|
return validPod, nil
|
|
}
|
|
|
|
// NodeShouldRunPod determines whether a node should run a pod according to scheduling rules
|
|
func NodeShouldRunPod(node *corev1.Node, pod *corev1.Pod) bool {
|
|
taints := node.Spec.Taints
|
|
fitsNodeName, fitsNodeAffinity, fitsTaints := daemon.Predicates(pod, node, taints)
|
|
return fitsNodeName && fitsNodeAffinity && fitsTaints
|
|
}
|
|
|
|
func MakeValidPodByPod(pod *corev1.Pod) (*corev1.Pod, error) {
|
|
// Generate a new UID for the Pod and make it valid
|
|
pod.UID = uuid.NewUUID()
|
|
newPod, err := MakeValidPod(pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newPod, nil
|
|
}
|
|
|
|
// MakeValidPod makes a Pod valid, so it can be handled
|
|
func MakeValidPod(oldPod *corev1.Pod) (*corev1.Pod, error) {
|
|
newPod := oldPod.DeepCopy()
|
|
|
|
// Set default values for various fields in the Pod
|
|
if newPod.Labels == nil {
|
|
newPod.Labels = make(map[string]string)
|
|
}
|
|
|
|
if newPod.ObjectMeta.Namespace == "" {
|
|
newPod.ObjectMeta.Namespace = corev1.NamespaceDefault
|
|
}
|
|
|
|
if newPod.ObjectMeta.Annotations == nil {
|
|
newPod.ObjectMeta.Annotations = map[string]string{}
|
|
}
|
|
|
|
// Set default values for Pod Spec
|
|
if newPod.Spec.DNSPolicy == "" {
|
|
newPod.Spec.DNSPolicy = corev1.DNSClusterFirst
|
|
}
|
|
|
|
if newPod.Spec.RestartPolicy == "" {
|
|
newPod.Spec.RestartPolicy = corev1.RestartPolicyAlways
|
|
}
|
|
|
|
if newPod.Spec.SchedulerName == "" {
|
|
newPod.Spec.SchedulerName = simontype.DefaultSchedulerName
|
|
}
|
|
|
|
newPod.Spec.ImagePullSecrets = nil
|
|
|
|
// Reset various fields in InitContainers
|
|
if newPod.Spec.InitContainers != nil {
|
|
for i := range newPod.Spec.InitContainers {
|
|
if newPod.Spec.InitContainers[i].TerminationMessagePolicy == "" {
|
|
newPod.Spec.InitContainers[i].TerminationMessagePolicy = corev1.TerminationMessageFallbackToLogsOnError
|
|
}
|
|
// ... (continue resetting other fields)
|
|
}
|
|
}
|
|
|
|
// Reset various fields in Containers
|
|
if newPod.Spec.Containers != nil {
|
|
for i := range newPod.Spec.Containers {
|
|
if newPod.Spec.Containers[i].TerminationMessagePolicy == "" {
|
|
newPod.Spec.Containers[i].TerminationMessagePolicy = corev1.TerminationMessageFallbackToLogsOnError
|
|
}
|
|
// ... (continue resetting other fields)
|
|
}
|
|
}
|
|
|
|
// Handle volumes
|
|
if newPod.Spec.Volumes != nil {
|
|
for i := range newPod.Spec.Volumes {
|
|
if newPod.Spec.Volumes[i].PersistentVolumeClaim != nil {
|
|
newPod.Spec.Volumes[i].HostPath = new(corev1.HostPathVolumeSource)
|
|
newPod.Spec.Volumes[i].HostPath.Path = "/tmp"
|
|
newPod.Spec.Volumes[i].PersistentVolumeClaim = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Validate the modified Pod
|
|
if err := ValidatePod(newPod); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newPod, nil
|
|
}
|
|
|
|
// AddWorkloadInfoToPod adds annotations to the Pod for simulation purposes
|
|
func AddWorkloadInfoToPod(pod *corev1.Pod, kind string, name string, namespace string) *corev1.Pod {
|
|
pod.ObjectMeta.Annotations[simontype.AnnoWorkloadKind] = kind
|
|
pod.ObjectMeta.Annotations[simontype.AnnoWorkloadName] = name
|
|
pod.ObjectMeta.Annotations[simontype.AnnoWorkloadNamespace] = namespace
|
|
return pod
|
|
}
|
|
|
|
func MakeValidNodeByNode(node *corev1.Node, nodeName string) (*corev1.Node, error) {
|
|
// Set node metadata and labels
|
|
node.ObjectMeta.Name = nodeName
|
|
node.ObjectMeta.UID = uuid.NewUUID()
|
|
|
|
if node.ObjectMeta.Labels == nil {
|
|
node.ObjectMeta.Labels = map[string]string{}
|
|
}
|
|
|
|
node.ObjectMeta.Labels[corev1.LabelHostname] = nodeName
|
|
|
|
if node.ObjectMeta.Annotations == nil {
|
|
node.ObjectMeta.Annotations = map[string]string{}
|
|
}
|
|
|
|
// Validate the modified Node
|
|
if err := ValidateNode(node); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return node, nil
|
|
}
|
|
|
|
// ValidatePod checks if a Pod is valid
|
|
func ValidatePod(pod *corev1.Pod) error {
|
|
internalPod := &api.Pod{}
|
|
if err := apiv1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
|
|
return fmt.Errorf("unable to convert to internal version: %#v ", err)
|
|
}
|
|
|
|
if errs := validation.ValidatePodCreate(internalPod, validation.PodValidationOptions{}); len(errs) > 0 {
|
|
var errStrs []string
|
|
for _, err := range errs {
|
|
errStrs = append(errStrs, fmt.Sprintf("%v", err))
|
|
}
|
|
return fmt.Errorf("invalid pod: %#v ", strings.Join(errStrs, "\n"))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetSHA256HashCode(message []byte, num int) string {
|
|
// Calculate and return the SHA-256 hash code
|
|
hash := sha256.New()
|
|
hash.Write(message)
|
|
hashCode := hex.EncodeToString(hash.Sum(nil))
|
|
return hashCode[:num]
|
|
}
|
|
|
|
type NodeStorage struct {
|
|
VGs []localcache.SharedResource `json:"vgs"`
|
|
Devices []localcache.ExclusiveResource `json:"devices"`
|
|
}
|
|
|
|
type Volume struct {
|
|
Size int64 `json:"size,string"`
|
|
Kind string `json:"kind"`
|
|
// StorageClassName is the name of the storage class
|
|
StorageClassName string `json:"scName"`
|
|
}
|
|
|
|
type VolumeRequest struct {
|
|
Volumes []Volume `json:"volumes"`
|
|
}
|
|
|
|
func GetNodeStorage(node *corev1.Node) (*NodeStorage, error) {
|
|
// Retrieve and parse node storage information
|
|
nodeStorageStr, exist := node.Annotations[simontype.AnnoNodeLocalStorage]
|
|
if !exist {
|
|
return nil, nil
|
|
}
|
|
|
|
nodeStorage := new(NodeStorage)
|
|
if err := ffjson.Unmarshal([]byte(nodeStorageStr), nodeStorage); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal info of node %s: %s ", node.Name, err.Error())
|
|
}
|
|
|
|
return nodeStorage, nil
|
|
|
|
func GetNodeCache(node *corev1.Node) (*localcache.NodeCache, error) {
|
|
nodeStorage, err := GetNodeStorage(node)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if nodeStorage == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
nc := localcache.NewNodeCache(node.Name)
|
|
var vgRes map[localcache.ResourceName]localcache.SharedResource = make(map[localcache.ResourceName]localcache.SharedResource)
|
|
for _, vg := range nodeStorage.VGs {
|
|
vgRes[localcache.ResourceName(vg.Name)] = vg
|
|
}
|
|
nc.VGs = vgRes
|
|
|
|
var deviceRes map[localcache.ResourceName]localcache.ExclusiveResource = make(map[localcache.ResourceName]localcache.ExclusiveResource)
|
|
for _, device := range nodeStorage.Devices {
|
|
deviceRes[localcache.ResourceName(device.Device)] = device
|
|
}
|
|
nc.Devices = deviceRes
|
|
|
|
return nc, nil
|
|
}
|
|
|
|
func GetPodStorage(pod *corev1.Pod) *VolumeRequest {
|
|
podStorageStr, exist := pod.Annotations[simontype.AnnoPodLocalStorage]
|
|
if !exist {
|
|
return nil
|
|
}
|
|
|
|
podStorage := new(VolumeRequest)
|
|
if err := ffjson.Unmarshal([]byte(podStorageStr), &podStorage); err != nil {
|
|
log.Errorf("unmarshal volume info of pod %s/%s failed: %s", pod.Namespace, pod.Name, err.Error())
|
|
return nil
|
|
}
|
|
|
|
return podStorage
|
|
}
|
|
|
|
func GetPodLocalPVCs(pod *corev1.Pod) ([]*corev1.PersistentVolumeClaim, []*corev1.PersistentVolumeClaim) {
|
|
podStorage := GetPodStorage(pod)
|
|
if podStorage == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
lvmPVCs := make([]*corev1.PersistentVolumeClaim, 0)
|
|
devicePVCs := make([]*corev1.PersistentVolumeClaim, 0)
|
|
for i, volume := range podStorage.Volumes {
|
|
scName := ""
|
|
if volume.Kind == "LVM" || volume.Kind == "HDD" || volume.Kind == "SSD" {
|
|
scName = volume.StorageClassName
|
|
} else {
|
|
log.Errorf("unsupported volume kind: %s", volume.Kind)
|
|
continue
|
|
}
|
|
volumeQuantity := resource.NewQuantity(volume.Size, resource.BinarySI)
|
|
pvc := &corev1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("pvc-%s-%d", pod.Name, i),
|
|
Namespace: pod.Namespace,
|
|
},
|
|
Spec: corev1.PersistentVolumeClaimSpec{
|
|
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
|
|
StorageClassName: &scName,
|
|
Resources: corev1.ResourceRequirements{
|
|
Requests: corev1.ResourceList{
|
|
corev1.ResourceName(corev1.ResourceStorage): *volumeQuantity,
|
|
},
|
|
},
|
|
},
|
|
Status: corev1.PersistentVolumeClaimStatus{
|
|
Phase: corev1.ClaimPending,
|
|
},
|
|
}
|
|
if scName == OpenLocalSCNameLVM || scName == YodaSCNameLVM {
|
|
lvmPVCs = append(lvmPVCs, pvc)
|
|
} else {
|
|
devicePVCs = append(devicePVCs, pvc)
|
|
}
|
|
}
|
|
|
|
return lvmPVCs, devicePVCs
|
|
}
|
|
|
|
// ValidateNode check if node is valid
|
|
func ValidateNode(node *corev1.Node) error {
|
|
internalNode := &api.Node{}
|
|
if err := apiv1.Convert_v1_Node_To_core_Node(node, internalNode, nil); err != nil {
|
|
return fmt.Errorf("unable to convert to internal version: %#v ", err)
|
|
}
|
|
if errs := validation.ValidateNode(internalNode); len(errs) > 0 {
|
|
var errStrs []string
|
|
for _, err := range errs {
|
|
errStrs = append(errStrs, fmt.Sprintf("%v", err))
|
|
}
|
|
return fmt.Errorf("invalid node: %#v ", strings.Join(errStrs, "\n"))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetPodsTotalRequestsAndLimitsByNodeName(pods []*corev1.Pod, nodeName string) (map[corev1.ResourceName]resource.Quantity, map[corev1.ResourceName]resource.Quantity) {
|
|
reqs, limits := make(map[corev1.ResourceName]resource.Quantity), make(map[corev1.ResourceName]resource.Quantity)
|
|
for _, pod := range pods {
|
|
if pod.Spec.NodeName != nodeName {
|
|
continue
|
|
}
|
|
podReqs, podLimits := resourcehelper.PodRequestsAndLimits(pod)
|
|
for podReqName, podReqValue := range podReqs {
|
|
if value, ok := reqs[podReqName]; !ok {
|
|
reqs[podReqName] = podReqValue.DeepCopy()
|
|
} else {
|
|
value.Add(podReqValue)
|
|
reqs[podReqName] = value
|
|
}
|
|
}
|
|
for podLimitName, podLimitValue := range podLimits {
|
|
if value, ok := limits[podLimitName]; !ok {
|
|
limits[podLimitName] = podLimitValue.DeepCopy()
|
|
} else {
|
|
value.Add(podLimitValue)
|
|
limits[podLimitName] = value
|
|
}
|
|
}
|
|
}
|
|
return reqs, limits
|
|
}
|
|
|
|
func AdjustWorkloads(workloads map[string][]string) {
|
|
if workloads == nil {
|
|
return
|
|
}
|
|
|
|
for name, nodes := range workloads {
|
|
workloads[name] = AdjustNodesOrder(nodes)
|
|
}
|
|
}
|
|
|
|
func AdjustNodesOrder(nodes []string) []string {
|
|
queue := NewNodeQueue(nodes)
|
|
sort.Sort(queue)
|
|
return queue.nodes
|
|
}
|
|
|
|
type NodeQueue struct {
|
|
nodes []string
|
|
}
|
|
|
|
func NewNodeQueue(nodes []string) *NodeQueue {
|
|
return &NodeQueue{
|
|
nodes: nodes,
|
|
}
|
|
}
|
|
|
|
func (queue *NodeQueue) Len() int { return len(queue.nodes) }
|
|
func (queue *NodeQueue) Swap(i, j int) {
|
|
queue.nodes[i], queue.nodes[j] = queue.nodes[j], queue.nodes[i]
|
|
}
|
|
func (queue *NodeQueue) Less(i, j int) bool {
|
|
if strings.Contains(queue.nodes[i], simontype.NewNodeNamePrefix) && strings.Contains(queue.nodes[j], simontype.NewNodeNamePrefix) {
|
|
return queue.nodes[i] < queue.nodes[j]
|
|
}
|
|
|
|
if !strings.Contains(queue.nodes[i], simontype.NewNodeNamePrefix) && !strings.Contains(queue.nodes[j], simontype.NewNodeNamePrefix) {
|
|
return queue.nodes[i] < queue.nodes[j]
|
|
}
|
|
|
|
if !strings.Contains(queue.nodes[i], simontype.NewNodeNamePrefix) {
|
|
return true
|
|
}
|
|
|
|
if !strings.Contains(queue.nodes[j], simontype.NewNodeNamePrefix) {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// MultiplyMilliQuant scales quantity by factor
|
|
func MultiplyMilliQuant(quant resource.Quantity, factor float64) resource.Quantity {
|
|
milliValue := quant.MilliValue()
|
|
newMilliValue := int64(float64(milliValue) * factor)
|
|
newQuant := resource.NewMilliQuantity(newMilliValue, quant.Format)
|
|
return *newQuant
|
|
}
|
|
|
|
// MultiplyQuant scales quantity by factor
|
|
func MultiplyQuant(quant resource.Quantity, factor float64) resource.Quantity {
|
|
value := quant.Value()
|
|
newValue := int64(float64(value) * factor)
|
|
newQuant := resource.NewQuantity(newValue, quant.Format)
|
|
return *newQuant
|
|
}
|
|
|
|
func GetNodeAllocatable(node *corev1.Node) (resource.Quantity, resource.Quantity) {
|
|
nodeAllocatable := node.Status.Allocatable.DeepCopy()
|
|
return *nodeAllocatable.Cpu(), *nodeAllocatable.Memory()
|
|
}
|
|
|
|
func GetNodeAllocatableCpuGpu(node *corev1.Node) []float64 {
|
|
milliCpu := node.Status.Allocatable.Cpu().MilliValue()
|
|
milliGpu := gpushareutils.GetGpuCountOfNode(node) * gpushareutils.MILLI
|
|
return []float64{float64(milliCpu), float64(milliGpu)}
|
|
}
|
|
|
|
func MeetResourceRequests(node *corev1.Node, pod *corev1.Pod, daemonSets []*appsv1.DaemonSet) (bool, error) {
|
|
// CPU and Memory
|
|
totalResource := map[corev1.ResourceName]*resource.Quantity{
|
|
corev1.ResourceCPU: resource.NewQuantity(0, resource.DecimalSI),
|
|
corev1.ResourceMemory: resource.NewQuantity(0, resource.DecimalSI),
|
|
}
|
|
|
|
for _, item := range daemonSets {
|
|
newItem := item
|
|
daemonPod, err := NewDaemonPod(newItem, simontype.NewNodeNamePrefix)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if NodeShouldRunPod(node, daemonPod) {
|
|
for _, container := range daemonPod.Spec.Containers {
|
|
totalResource[corev1.ResourceCPU].Add(*container.Resources.Requests.Cpu())
|
|
totalResource[corev1.ResourceMemory].Add(*container.Resources.Requests.Memory())
|
|
}
|
|
}
|
|
}
|
|
for _, container := range pod.Spec.Containers {
|
|
totalResource[corev1.ResourceCPU].Add(*container.Resources.Requests.Cpu())
|
|
totalResource[corev1.ResourceMemory].Add(*container.Resources.Requests.Memory())
|
|
}
|
|
|
|
if totalResource[corev1.ResourceCPU].Cmp(*node.Status.Allocatable.Cpu()) == 1 ||
|
|
totalResource[corev1.ResourceMemory].Cmp(*node.Status.Allocatable.Memory()) == 1 {
|
|
return false, nil
|
|
}
|
|
|
|
// Local Storage
|
|
nodeStorage, err := GetNodeStorage(node)
|
|
if err != nil {
|
|
return false, err
|
|
} else if nodeStorage == nil {
|
|
return true, nil
|
|
}
|
|
var nodeVGMax int64 = 0
|
|
for _, vg := range nodeStorage.VGs {
|
|
if vg.Capacity > int64(nodeVGMax) {
|
|
nodeVGMax = vg.Capacity
|
|
}
|
|
}
|
|
lvmPVCs, _ := GetPodLocalPVCs(pod)
|
|
var pvcSum int64 = 0
|
|
for _, pvc := range lvmPVCs {
|
|
pvcSum += localutils.GetPVCRequested(pvc)
|
|
}
|
|
|
|
return pvcSum <= nodeVGMax, nil
|
|
}
|
|
|
|
func CreateKubeClient(kubeconfig string) (*clientset.Clientset, error) {
|
|
if len(kubeconfig) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var err error
|
|
var cfg *restclient.Config
|
|
master, err := GetMasterFromKubeConfig(kubeconfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse kubeclient file: %v ", err)
|
|
}
|
|
|
|
cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to build config: %v ", err)
|
|
}
|
|
|
|
kubeClient, err := clientset.NewForConfig(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return kubeClient, nil
|
|
}
|
|
|
|
func GetMasterFromKubeConfig(filename string) (string, error) {
|
|
config, err := clientcmd.LoadFromFile(filename)
|
|
if err != nil {
|
|
return "", fmt.Errorf("can not load kubeconfig file: %v", err)
|
|
}
|
|
|
|
varContext, ok := config.Contexts[config.CurrentContext]
|
|
if !ok {
|
|
return "", fmt.Errorf("failed to get master address from kubeconfig")
|
|
}
|
|
|
|
if val, ok := config.Clusters[varContext.Cluster]; ok {
|
|
return val.Server, nil
|
|
}
|
|
return "", fmt.Errorf("failed to get master address from kubeconfig")
|
|
}
|
|
|
|
func SetDaemonSetPodNodeNameByNodeAffinity(affinity *corev1.Affinity, nodename string) *corev1.Affinity {
|
|
nodeSelReq := corev1.NodeSelectorRequirement{
|
|
Key: api.ObjectNameField,
|
|
Operator: corev1.NodeSelectorOpIn,
|
|
Values: []string{nodename},
|
|
}
|
|
|
|
nodeSelector := &corev1.NodeSelector{
|
|
NodeSelectorTerms: []corev1.NodeSelectorTerm{
|
|
{
|
|
MatchFields: []corev1.NodeSelectorRequirement{nodeSelReq},
|
|
},
|
|
},
|
|
}
|
|
|
|
if affinity == nil {
|
|
return &corev1.Affinity{
|
|
NodeAffinity: &corev1.NodeAffinity{
|
|
RequiredDuringSchedulingIgnoredDuringExecution: nodeSelector,
|
|
},
|
|
}
|
|
}
|
|
|
|
if affinity.NodeAffinity == nil {
|
|
affinity.NodeAffinity = &corev1.NodeAffinity{
|
|
RequiredDuringSchedulingIgnoredDuringExecution: nodeSelector,
|
|
}
|
|
return affinity
|
|
}
|
|
|
|
if affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
|
|
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = nodeSelector
|
|
return affinity
|
|
}
|
|
|
|
// Replace node selector with the new one.
|
|
|
|
nodeSelectorTerms := affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
|
|
for i, item := range nodeSelectorTerms {
|
|
item.MatchFields = []corev1.NodeSelectorRequirement{nodeSelReq}
|
|
nodeSelectorTerms[i] = item
|
|
}
|
|
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms
|
|
|
|
return affinity
|
|
}
|
|
|
|
func GetGpuNodeInfoFromAnnotation(node *corev1.Node) (*gpusharecache.GpuNodeInfoStr, error) {
|
|
gpuNodeInfoString, exist := node.Annotations[simontype.AnnoNodeGpuShare]
|
|
if !exist {
|
|
return nil, nil
|
|
}
|
|
|
|
gpuNodeInfoStr := new(gpusharecache.GpuNodeInfoStr)
|
|
if err := ffjson.Unmarshal([]byte(gpuNodeInfoString), gpuNodeInfoStr); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal GPU info of node %s: %s ", node.Name, err.Error())
|
|
}
|
|
|
|
return gpuNodeInfoStr, nil
|
|
}
|
|
|
|
func GetAllocatablePodList(clientset externalclientset.Interface) []corev1.Pod {
|
|
podList, _ := clientset.CoreV1().Pods(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
|
|
return podList.Items
|
|
}
|
|
|
|
func IsNodeAccessibleToPod(nodeRes simontype.NodeResource, podRes simontype.PodResource) bool {
|
|
pt := podRes.GpuType
|
|
nt := nodeRes.GpuType
|
|
return IsNodeAccessibleToPodByType(nt, pt)
|
|
}
|
|
|
|
func IsNodeAccessibleToPodByType(nodeGpuType string, podGpuType string) bool {
|
|
if len(podGpuType) == 0 {
|
|
return true
|
|
}
|
|
if len(nodeGpuType) == 0 {
|
|
return false // i.e., CPU node
|
|
}
|
|
|
|
//pm, ok := gpushareutils.MapGpuTypeMemoryMiB[podGpuType]
|
|
//if !ok {
|
|
// log.Errorf("Pod GPU Type: %s not in Map", podGpuType)
|
|
// return false
|
|
//}
|
|
//
|
|
//nm, ok := gpushareutils.MapGpuTypeMemoryMiB[nodeGpuType]
|
|
//if !ok {
|
|
// log.Errorf("Node GPU Type: %s not in Map", nodeGpuType)
|
|
// return false
|
|
//}
|
|
//
|
|
//if pm > nm {
|
|
// return false
|
|
//}
|
|
podGpuTypeList := strings.Split(podGpuType, "|")
|
|
cnt := 0
|
|
for _, gpuType := range podGpuTypeList {
|
|
if len(gpuType) == 0 {
|
|
continue
|
|
}
|
|
cnt++
|
|
if gpuType == nodeGpuType {
|
|
return true
|
|
}
|
|
}
|
|
if cnt > 0 { // pod requests at least one specific GPU type but node doesn't match
|
|
return false
|
|
} else { // pod actually doesn't request any specific GPU type
|
|
return true
|
|
}
|
|
}
|
|
|
|
func GetPodResource(pod *corev1.Pod) simontype.PodResource {
|
|
gpuNumber := gpushareutils.GetGpuCountFromPodAnnotation(pod)
|
|
gpuMilli := gpushareutils.GetGpuMilliFromPodAnnotation(pod)
|
|
gpuType := gpushareutils.GetGpuModelFromPodAnnotation(pod)
|
|
|
|
var non0CPU, non0Mem int64
|
|
for _, c := range pod.Spec.Containers {
|
|
non0CPUReq, non0MemReq := schedulerutil.GetNonzeroRequests(&c.Resources.Requests)
|
|
non0CPU += non0CPUReq
|
|
non0Mem += non0MemReq
|
|
}
|
|
|
|
tgtPodRes := simontype.PodResource{
|
|
MilliCpu: non0CPU,
|
|
MilliGpu: gpuMilli,
|
|
GpuNumber: gpuNumber,
|
|
GpuType: gpuType,
|
|
}
|
|
return tgtPodRes
|
|
}
|
|
|
|
func GetNodeResourceMap(nodeStatus []simontype.NodeStatus) map[string]simontype.NodeResource {
|
|
nodeResMap := make(map[string]simontype.NodeResource)
|
|
for _, ns := range nodeStatus {
|
|
node := ns.Node
|
|
if nodeRes := GetNodeResourceViaPodList(ns.Pods, node); nodeRes != nil {
|
|
nodeResMap[node.Name] = *nodeRes
|
|
} else {
|
|
log.Errorf("[GetNodeResourceMap] failed to get nodeRes(%s)\n", node.Name)
|
|
}
|
|
}
|
|
return nodeResMap
|
|
}
|
|
|
|
func ExportNodeStatusToCsv(nodeStatus []simontype.NodeStatus, filePath string) error {
|
|
f, err := os.Create(filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
return nil
|
|
}
|
|
|
|
func GetNodeResourceViaNodeInfo(nodeInfo *framework.NodeInfo) (nodeRes *simontype.NodeResource) {
|
|
node := nodeInfo.Node()
|
|
milliCpuLeft := node.Status.Allocatable.Cpu().MilliValue() - nodeInfo.Requested.MilliCPU
|
|
nodeGpuAffinity := map[string]int{}
|
|
for i := 0; i < len(nodeInfo.Pods); i++ {
|
|
p := nodeInfo.Pods[i].Pod
|
|
affinity := gpushareutils.GetGpuAffinityFromPodAnnotation(p)
|
|
if affinity != gpushareutils.NoGpuTag {
|
|
nodeGpuAffinity[affinity] += 1
|
|
}
|
|
}
|
|
|
|
return &simontype.NodeResource{
|
|
NodeName: node.Name,
|
|
MilliCpuLeft: milliCpuLeft,
|
|
MilliCpuCapacity: node.Status.Allocatable.Cpu().MilliValue(),
|
|
MilliGpuLeftList: getGpuMilliLeftListOnNode(node),
|
|
GpuNumber: gpushareutils.GetGpuCountOfNode(node),
|
|
GpuType: gpushareutils.GetGpuModelOfNode(node),
|
|
GpuAffinity: nodeGpuAffinity,
|
|
}
|
|
}
|
|
|
|
func GetNodeResourceViaHandleAndName(handle framework.Handle, nodeName string) (nodeRes *simontype.NodeResource) {
|
|
nodeInfo, err := handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return GetNodeResourceViaNodeInfo(nodeInfo)
|
|
}
|
|
|
|
func GetNodeResourceViaPodList(podList []*corev1.Pod, node *corev1.Node) (nodeRes *simontype.NodeResource) {
|
|
allocatable := node.Status.Allocatable
|
|
reqs, _ := GetPodsTotalRequestsAndLimitsByNodeName(podList, node.Name)
|
|
nodeCpuReq, _ := reqs[corev1.ResourceCPU], reqs[corev1.ResourceMemory]
|
|
nodeGpuAffinity := map[string]int{}
|
|
for _, p := range podList {
|
|
affinity := gpushareutils.GetGpuAffinityFromPodAnnotation(p)
|
|
if affinity != gpushareutils.NoGpuTag {
|
|
nodeGpuAffinity[affinity] += 1
|
|
}
|
|
}
|
|
|
|
return &simontype.NodeResource{
|
|
NodeName: node.Name,
|
|
MilliCpuLeft: allocatable.Cpu().MilliValue() - nodeCpuReq.MilliValue(),
|
|
MilliCpuCapacity: allocatable.Cpu().MilliValue(),
|
|
MilliGpuLeftList: getGpuMilliLeftListOnNode(node),
|
|
GpuNumber: gpushareutils.GetGpuCountOfNode(node),
|
|
GpuType: gpushareutils.GetGpuModelOfNode(node),
|
|
GpuAffinity: nodeGpuAffinity,
|
|
}
|
|
}
|
|
|
|
func GetNodeResourceAndPodResourceViaHandle(p *corev1.Pod, nodeName string, handle framework.Handle) (nodeResPtr *simontype.NodeResource, podResPtr *simontype.PodResource) {
|
|
nodeResPtr = GetNodeResourceViaHandleAndName(handle, nodeName)
|
|
if nodeResPtr == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
if p == nil {
|
|
return nil, nil
|
|
}
|
|
podRes := GetPodResource(p)
|
|
podResPtr = &podRes
|
|
if !IsNodeAccessibleToPod(*nodeResPtr, *podResPtr) {
|
|
return nil, nil
|
|
}
|
|
|
|
return nodeResPtr, podResPtr
|
|
}
|
|
|
|
func getGpuMilliLeftListOnNode(node *corev1.Node) []int64 {
|
|
// ignore non-gpu node
|
|
if !utils.IsGpuSharingNode(node) {
|
|
return nil
|
|
}
|
|
|
|
gpuNum := gpushareutils.GetGpuCountOfNode(node)
|
|
gpuMilliLeftList := make([]int64, gpuNum)
|
|
for i := 0; i < gpuNum; i++ {
|
|
gpuMilliLeftList[i] = gpushareutils.MILLI
|
|
}
|
|
if gpuNodeInfoStr, err := GetGpuNodeInfoFromAnnotation(node); err == nil && gpuNodeInfoStr != nil {
|
|
for _, dev := range gpuNodeInfoStr.DevsBrief {
|
|
gpuMilliLeftList[dev.Idx] -= dev.GpuUsedMilli
|
|
}
|
|
} else {
|
|
// TODO: Add node annotation in the initialization phrase
|
|
//log.Errorf("[getGpuMilliLeftListOnNode] failed to parse node(%s) gpu info from annotation(%v)\n",
|
|
// node.Name, node.ObjectMeta.Annotations)
|
|
}
|
|
return gpuMilliLeftList
|
|
}
|
|
|
|
func GetAllPodsPtrFromNodeStatus(nodeStatus []simontype.NodeStatus) []*corev1.Pod {
|
|
var allPods []*corev1.Pod
|
|
for _, status := range nodeStatus {
|
|
for _, pod := range status.Pods {
|
|
if pod != nil {
|
|
allPods = append(allPods, pod)
|
|
} else {
|
|
log.Errorf("Nil Pod pointer exists in nodeStatus: %v\n", nodeStatus)
|
|
}
|
|
}
|
|
}
|
|
return allPods
|
|
}
|
|
|
|
func GetPodsPtrFromPods(pods []corev1.Pod) (podsPtr []*corev1.Pod) {
|
|
for _, pod := range pods {
|
|
podsPtr = append(podsPtr, pod.DeepCopy()) // &pod is wrong; using pod.DeepCopy() instead.
|
|
}
|
|
return
|
|
}
|
|
|
|
func RemovePodFromPodSliceByPod(pods []*corev1.Pod, pod *corev1.Pod) []*corev1.Pod {
|
|
ret := make([]*corev1.Pod, 0)
|
|
for _, p := range pods {
|
|
if p != pod {
|
|
ret = append(ret, p)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func GetResourceSimilarity(nodeRes simontype.NodeResource, podRes simontype.PodResource) float64 {
|
|
freeVec := nodeRes.ToResourceVec()
|
|
requestVec := podRes.ToResourceVec()
|
|
|
|
similarity := CalculateVectorCosineSimilarity(freeVec, requestVec)
|
|
delta := 1e-3
|
|
if (similarity < 0-delta) || (similarity > 1+delta) {
|
|
log.Errorf("similarity(%.4f) is not in the range [0,1], should not happen. freeVec: %v, requestVec: %v\n",
|
|
similarity, freeVec, requestVec)
|
|
return -1
|
|
}
|
|
return similarity
|
|
}
|
|
|
|
// CalculateVectorCosineSimilarity returns value in range [0, 1] or -1 (error)
|
|
func CalculateVectorCosineSimilarity(vec1, vec2 []float64) float64 {
|
|
if len(vec1) == 0 || len(vec2) == 0 || len(vec1) != len(vec2) {
|
|
log.Errorf("empty vector(s) or vectors of unequal size, vec1 %v, vec2 %v\n", vec1, vec2)
|
|
return -1
|
|
}
|
|
var magnitude1, magnitude2, innerProduct float64
|
|
for index, num1 := range vec1 {
|
|
num2 := vec2[index]
|
|
magnitude1 += num1 * num1
|
|
magnitude2 += num2 * num2
|
|
innerProduct += num1 * num2
|
|
}
|
|
magnitude1 = math.Sqrt(magnitude1)
|
|
magnitude2 = math.Sqrt(magnitude2)
|
|
|
|
if magnitude1 == 0 || magnitude2 == 0 {
|
|
log.Errorf("vector(s) of zero magnitude. vec1: %v, vec2: %v\n", vec1, vec2)
|
|
return -1
|
|
}
|
|
|
|
similarity := innerProduct / (magnitude1 * magnitude2)
|
|
return similarity
|
|
}
|
|
|
|
func NormalizeVector(vec []float64, normVec []float64) []float64 {
|
|
out := make([]float64, len(vec))
|
|
copy(out, vec)
|
|
|
|
if len(normVec) == 0 || len(vec) == 0 || len(normVec) != len(vec) {
|
|
log.Errorf("empty vector(s) or vectors of unequal size, vec %v, normVec %v\n", vec, normVec)
|
|
return out
|
|
}
|
|
for i := 0; i < len(out); i++ {
|
|
if normVec[i] > 0 {
|
|
out[i] = out[i] / normVec[i]
|
|
} else {
|
|
out[i] = 0
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func CalculateVectorDotProduct(vec1, vec2 []float64) (innerProduct float64) {
|
|
if len(vec1) == 0 || len(vec2) == 0 || len(vec1) != len(vec2) {
|
|
log.Errorf("empty vector(s) or vectors of unequal size, vec1 %v, vec2 %v\n", vec1, vec2)
|
|
return -1
|
|
}
|
|
for index, num1 := range vec1 {
|
|
num2 := vec2[index]
|
|
innerProduct += num1 * num2
|
|
}
|
|
return innerProduct
|
|
}
|
|
|
|
func CalculateL2NormDiff(vec1, vec2 []float64) (l2norm float64) {
|
|
if len(vec1) == 0 || len(vec2) == 0 || len(vec1) != len(vec2) {
|
|
log.Errorf("empty vector(s) or vectors of unequal size, vec1 %v, vec2 %v\n", vec1, vec2)
|
|
return -1
|
|
}
|
|
for index, num1 := range vec1 {
|
|
num2 := vec2[index]
|
|
l2norm += (num1 - num2) * (num1 - num2)
|
|
}
|
|
return l2norm
|
|
}
|
|
|
|
func CalculateL2NormRatio(vec1, vec2 []float64) (l2norm float64) {
|
|
if len(vec1) == 0 || len(vec2) == 0 || len(vec1) != len(vec2) {
|
|
log.Errorf("empty vector(s) or vectors of unequal size, vec1 %v, vec2 %v\n", vec1, vec2)
|
|
return -1
|
|
}
|
|
for index, num1 := range vec1 {
|
|
num2 := vec2[index]
|
|
l2norm += (num1 / num2) * (num1 / num2)
|
|
}
|
|
return l2norm
|
|
}
|
|
|
|
func GenerateSchedulingMatchGroups(nodeRes simontype.NodeResource, podRes simontype.PodResource,
|
|
gpuDimExtMethod simontype.GpuDimExtMethod, normMethod simontype.NormMethod) (matchGroups []simontype.SchedulingMatchGroup) {
|
|
|
|
virtualNodeResourceList := nodeRes.ToVirtualNodeResourceList(gpuDimExtMethod, podRes)
|
|
virtualPodResourceList := podRes.ToVirtualPodResourceList(gpuDimExtMethod, nodeRes)
|
|
|
|
for _, virtualNodeResource := range virtualNodeResourceList {
|
|
for _, virtualPodResource := range virtualPodResourceList {
|
|
nodeResourceVec := make([]float64, len(virtualNodeResource.ResourceVec))
|
|
copy(nodeResourceVec, virtualNodeResource.ResourceVec)
|
|
podResourceVec := make([]float64, len(virtualPodResource.ResourceVec))
|
|
copy(podResourceVec, virtualPodResource.ResourceVec)
|
|
|
|
matchGroup := simontype.SchedulingMatchGroup{
|
|
NodeResourceVec: nodeResourceVec,
|
|
PodResourceVec: podResourceVec,
|
|
}
|
|
|
|
if gpuDimExtMethod == simontype.SeparateGpuDimAndShareOtherDim || gpuDimExtMethod == simontype.SeparateGpuDimAndDivideOtherDim {
|
|
matchGroup.GpuId = virtualNodeResource.GpuId
|
|
} else if gpuDimExtMethod == simontype.ExtGpuDim {
|
|
matchGroup.GpuId = virtualPodResource.GpuId
|
|
}
|
|
|
|
if normMethod == simontype.NormByNode {
|
|
nodeCapacity := getNodeCapacity(nodeRes, gpuDimExtMethod)
|
|
matchGroup.NodeResourceVec = NormalizeVector(matchGroup.NodeResourceVec, nodeCapacity)
|
|
matchGroup.PodResourceVec = NormalizeVector(matchGroup.PodResourceVec, nodeCapacity)
|
|
} else if normMethod == simontype.NormByPod {
|
|
podRequest := getPodRequest(podRes, gpuDimExtMethod)
|
|
matchGroup.NodeResourceVec = NormalizeVector(matchGroup.NodeResourceVec, podRequest)
|
|
matchGroup.PodResourceVec = NormalizeVector(matchGroup.PodResourceVec, podRequest)
|
|
} else if normMethod == simontype.NormByMax {
|
|
maxNodeCapacity := getMaxNodeCapacity(gpuDimExtMethod)
|
|
matchGroup.NodeResourceVec = NormalizeVector(matchGroup.NodeResourceVec, maxNodeCapacity)
|
|
matchGroup.PodResourceVec = NormalizeVector(matchGroup.PodResourceVec, maxNodeCapacity)
|
|
}
|
|
|
|
matchGroups = append(matchGroups, matchGroup)
|
|
}
|
|
}
|
|
return matchGroups
|
|
}
|
|
|
|
func getNodeCapacity(nodeRes simontype.NodeResource, gpuDimExtMethod simontype.GpuDimExtMethod) []float64 {
|
|
if gpuDimExtMethod == simontype.ExtGpuDim {
|
|
nodeCapacity := []float64{float64(nodeRes.MilliCpuCapacity)}
|
|
nodeGpuResourceList := nodeRes.ToFormalizedGpuResourceList()
|
|
for i := 0; i < len(nodeGpuResourceList); i++ {
|
|
nodeCapacity = append(nodeCapacity, float64(nodeRes.GpuNumber*utils.MILLI))
|
|
}
|
|
return nodeCapacity
|
|
}
|
|
return []float64{float64(nodeRes.MilliCpuCapacity), float64(nodeRes.GpuNumber * gpushareutils.MILLI)}
|
|
}
|
|
|
|
func getPodRequest(podRes simontype.PodResource, gpuDimExtMethod simontype.GpuDimExtMethod) []float64 {
|
|
if gpuDimExtMethod == simontype.ExtGpuDim {
|
|
podRequest := []float64{float64(podRes.MilliCpu)}
|
|
nodeGpuResourceList := nodeRes.ToFormalizedGpuResourceList()
|
|
for i := 0; i < len(nodeGpuResourceList); i++ {
|
|
podRequest = append(podRequest, float64(podRes.MilliGpu*int64(podRes.GpuNumber)))
|
|
}
|
|
return podRequest
|
|
}
|
|
return []float64{float64(podRes.MilliCpu), float64(podRes.TotalMilliGpu())}
|
|
}
|
|
|
|
func getMaxNodeCapacity(gpuDimExtMethod simontype.GpuDimExtMethod) []float64 {
|
|
if gpuDimExtMethod == simontype.ExtGpuDim {
|
|
maxNodeCapacity := []float64{float64(utils.MaxSpecCpu)}
|
|
nodeGpuResourceList := nodeRes.ToFormalizedGpuResourceList()
|
|
for i := 0; i < len(nodeGpuResourceList); i++ {
|
|
maxNodeCapacity = append(maxNodeCapacity, float64(utils.MaxSpecGpu))
|
|
}
|
|
return maxNodeCapacity
|
|
}
|
|
return []float64{float64(utils.MaxSpecCpu), float64(utils.MaxSpecGpu)}
|
|
}
|
|
|
|
func ReportFailedPods(fp []simontype.UnscheduledPod) {
|
|
if len(fp) == 0 {
|
|
return
|
|
}
|
|
log.Infof("Failed Pods in detail:\n")
|
|
for _, up := range fp {
|
|
podResource := GetPodResource(up.Pod)
|
|
log.Infof(" %s: %s\n", GeneratePodKey(up.Pod), podResource.Repr())
|
|
}
|
|
log.Infoln()
|
|
}
|
|
|
|
func PodListRatioSum(tpl simontype.TargetPodList) (cumRatio float64) {
|
|
for _, pod := range tpl {
|
|
cumRatio += pod.Percentage // 0.0-1.0
|
|
}
|
|
return cumRatio
|
|
}
|
|
|
|
|