diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java index c3e3d5c..3eaf4dd 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java @@ -23,18 +23,23 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; /** * A class that connects to the EXN middleware and starts listening to * messages from the ActiveMQ server. * - * This class will drive the main behavior of the optimiser-controller: the + *

This class will drive the main behavior of the optimiser-controller: the * `Consumer` objects created in {@link ExnConnector#ExnConnector} receive * incoming messages and react to them, sending out messages in turn. + * + *

The class also provides methods wrapping the exn-sal middleware + * endpoints, converting from raw JSON responses to sal-common datatypes where + * possible. */ @Slf4j public class ExnConnector { @@ -67,31 +72,24 @@ public class ExnConnector { // ---------------------------------------- // Communication with SAL - // We define these publishers here instead of in the `SalConnector` - // class since they need to be registered here and I'm afraid I will - // forget to do it when adding new endpoints over in another class. - /** The createJob endpoint. */ - public static final SyncedPublisher createJob - = new SyncedPublisher("createJob", - "eu.nebulouscloud.exn.sal.job.post", true, true); + public final SyncedPublisher createJob; /** The findNodeCandidates endpoint. Should not be used during normal * operation--ask the broker instead. */ - public static final SyncedPublisher findNodeCandidates - = new SyncedPublisher("findNodeCandidates", - "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true); - /** The findNodeCandidates endpoint (Broker's version). */ - public static final SyncedPublisher findBrokerNodeCandidates - = new SyncedPublisher("findBrokerNodeCandidates", - "eu.nebulouscloud.cfsb.get_node_candidates", true, true); - /** The addNodes endpoint. */ - public static final SyncedPublisher addNodes - = new SyncedPublisher("addNodes", - "eu.nebulouscloud.exn.sal.nodes.add", true, true); - /** The submitJob endpoint. */ - public static final SyncedPublisher submitJob - = new SyncedPublisher("submitJob", - "eu.nebulouscloud.exn.sal.job.update", true, true); + public final SyncedPublisher findSalNodeCandidates; + /** The findNodeCandidates endpoint (Broker's version). This one adds + * attributes "score", "rank" to the answer it gets from SAL. */ + public final SyncedPublisher findBrokerNodeCandidates; + /** The defineCluster endpoint. */ + public final SyncedPublisher defineCluster; + /** The deployCluster endpoint. */ + public final SyncedPublisher deployCluster; + /** The deployApplication endpoint. */ + public final SyncedPublisher deployApplication; + /** The scaleOut endpoint. */ + public final SyncedPublisher scaleOut; + /** The scaleIn endpoint. */ + public final SyncedPublisher scaleIn; /** * Create a connection to ActiveMQ via the exn middleware, and set up the @@ -107,12 +105,19 @@ public class ExnConnector { */ public ExnConnector(String host, int port, String name, String password, ConnectorHandler callback) { amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true); + createJob = new SyncedPublisher("createJob", "eu.nebulouscloud.exn.sal.job.post", true, true); + findSalNodeCandidates = new SyncedPublisher("findSalNodeCandidates", "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true); + findBrokerNodeCandidates = new SyncedPublisher("findBrokerNodeCandidates", "eu.nebulouscloud.cfsb.get_node_candidates", true, true); + defineCluster = new SyncedPublisher("defineCluster", "eu.nebulouscloud.exn.sal.cluster.define", true, true); + deployCluster = new SyncedPublisher("deployCluster", "eu.nebulouscloud.exn.sal.cluster.deploy", true, true); + deployApplication = new SyncedPublisher("deployApplication", "eu.nebulouscloud.exn.sal.cluster.deployApplication", true, true); + scaleOut = new SyncedPublisher("scaleOut", "eu.nebulouscloud.exn.sal.cluster.scaleout", true, true); + scaleIn = new SyncedPublisher("scaleIn", "eu.nebulouscloud.exn.sal.cluster.scalein", true, true); conn = new Connector("optimiser_controller", callback, - // List.of(new Publisher("config", "config", true)), List.of(amplMessagePublisher, - createJob, findNodeCandidates, findBrokerNodeCandidates, addNodes, submitJob), + findSalNodeCandidates, findBrokerNodeCandidates, defineCluster, deployCluster, deployApplication, scaleOut, scaleIn), List.of( new Consumer("ui_app_messages", app_creation_channel, new AppCreationMessageHandler(), true, true), @@ -140,7 +145,7 @@ public class ExnConnector { /** * Disconnect from ActiveMQ and stop all Consumer processes. Also count * down the countdown latch passed in the {@link - * ExnConnector#start(CountDownLatch)} method if applicable. + * #start(CountDownLatch)} method if applicable. */ public synchronized void stop() { conn.stop(); @@ -150,6 +155,9 @@ public class ExnConnector { log.debug("ExnConnector stopped."); } + // ---------------------------------------- + // Message Handlers + /** * A message handler that processes app creation messages coming in via * `eu.nebulouscloud.ui.dsl.generic`. Such messages contain, among @@ -216,4 +224,334 @@ public class ExnConnector { } } + // ---------------------------------------- + // Communication with SAL + + /** + * Extract and check the SAL response from an exn-middleware response. + * The SAL response will be valid JSON encoded as a string in the "body" + * field of the response. If the response is of the following form, log + * an error and return a missing node instead: + * + *

{@code
+     * {
+     *   "key": ,
+     *   "message": "some error message"
+     * }
+     * }
+ * + * @param response The response from exn-middleware. + * @param appID The application ID, used for logging only. + * @return The SAL response as a parsed JsonNode, or a node where {@code + * isMissingNode()} will return true if SAL reported an error. + */ + private static JsonNode extractPayloadFromExnResponse(Map response, String appID) { + String body = (String)response.get("body"); + JsonNode payload = mapper.missingNode(); + try { + payload = mapper.readTree(body); + } catch (JsonProcessingException e) { + log.error("Could not read message body as JSON: " + body, keyValue("appId", appID), e); + return mapper.missingNode(); + } + // These messages are listed in the {@code AbstractProcessor} class of + // the exn-middleware project. + if (Set.of("generic-exception-error", + "gateway-client-exception-error", + "gateway-server-exception-error") + .contains(payload.at("/key").asText()) + && !payload.at("/message").isMissingNode()) { + log.error("exn-middleware-sal request failed with error type '{}' and message '{}'", + payload.at("/key").asText(), + payload.at("/message").asText(), + keyValue("appId", appID)); + return mapper.missingNode(); + } + return payload; + } + + /** + * Get list of node candidates from the resource broker that fulfill the + * given requirements, and sort them by rank and score so that better node + * candidates come first in the result. + * + *

A candidate is better than another one if it has a lower rank or, if + * the rank is equal, a higher score. + * + * @param requirements The list of requirements. + * @param appID The application ID. + * @return A sorted List containing node candidates, better candidates + * first. + */ + public List findNodeCandidates(List requirements, String appID) { + Map msg; + try { + msg = Map.of( + "metaData", Map.of("user", "admin"), + "body", mapper.writeValueAsString(requirements)); + } catch (JsonProcessingException e) { + log.error("Could not convert requirements list to JSON string (this should never happen)", + keyValue("appId", appID), e); + return null; + } + Map response = findBrokerNodeCandidates.sendSync(msg, appID, null, false); + // Note: we do not call extractPayloadFromExnResponse here, since this + // response does not come from the exn-middleware. + ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class); + // Note: what we would really like to do here is something like: + // return Arrays.asList(mapper.readValue(response, NodeCandidate[].class)); + // But since the broker adds two attributes, the array elements cannot + // be deserialized into org.ow2.proactive.sal.model.NodeCandidate + // objects. + List result = Arrays.asList(mapper.convertValue(jsonBody.withArray("/body"), JsonNode[].class)); + result.sort((JsonNode c1, JsonNode c2) -> { + long rank1 = c1.at("/rank").longValue(); + long rank2 = c2.at("/rank").longValue(); + double score1 = c1.at("/score").doubleValue(); + double score2 = c2.at("/score").doubleValue(); + // We return < 0 if c1 < c2. Since we want to sort better + // candidates first, c1 < c2 if rank is lower or rank is equal + // and score is higher. (Lower rank = better, higher score = + // better.) + if (rank1 != rank2) return Math.toIntExact(rank1 - rank2); + else return Math.toIntExact(Math.round(score2 - score1)); + }); + return result.stream() + .map(candidate -> + mapper.convertValue( + ((ObjectNode)candidate).deepCopy().remove(List.of("score", "rank")), + NodeCandidate.class)) + .collect(Collectors.toList()); + } + + /** + * Get list of node candidates from the resource broker that fulfil the + * given requirements. + * + *

Note that we cannot convert the result to a list containing {@code + * org.ow2.proactive.sal.model.NodeCandidate} instances, since the broker + * adds the additional fields {@code score} and {@code ranking}. Instead + * we return a JSON {@code ArrayNode} containing {@code ObjectNode}s in + * the format specified at + * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/nodecandidates-endpoints.md#71--filter-node-candidates-endpoint + * but with these two additional attributes. + * + * @param requirements The list of requirements. + * @param appID The application ID. + * @return A list containing node candidates, or null in case of error. + */ + public List findNodeCandidatesFromSal(List requirements, String appID) { + Map msg; + try { + msg = Map.of( + "metaData", Map.of("user", "admin"), + "body", mapper.writeValueAsString(requirements)); + } catch (JsonProcessingException e) { + log.error("Could not convert requirements list to JSON string (this should never happen)", + keyValue("appId", appID), e); + return null; + } + Map response = findSalNodeCandidates.sendSync(msg, appID, null, false); + JsonNode payload = extractPayloadFromExnResponse(response, appID); + if (payload.isMissingNode()) return null; + try { + return Arrays.asList(mapper.treeToValue(payload, NodeCandidate[].class)); + } catch (JsonProcessingException e) { + log.error("Could not decode node candidates payload", keyValue("appId", appID), e); + return null; + } + } + + /** + * Define a cluster with the given name and node list. + * + *

The nodes are passed in a JSON array containing objects of the + * following shape: + * + *

{@code
+     * {
+     *   "nodeName": "some-component",
+     *   "nodeCandidateId": "some-candidate-id",
+     *   "cloudId": "some-cloud-id"
+     * }
+     * }
+ * + *

Each value for {@code nodeName} has to be unique, and should be + * either the name of the master node or the name of a node that will + * subsequently be referenced in the affinity trait of the modified + * kubevela file (see {@link NebulousAppDeployer#addNodeAffinities()}). + * + *

The values for {@code nodeCandidateId} and {@code cloudId} come from + * the return value of a call to {@link #findNodeCandidates()}. + * + *

Note that this method could be rewritten to accept the nodes as a + * {@code List} instead, if that is + * more convenient. + * + * @param appID The application's id, used to name the cluster. + * @param masterNodeName The name of the master node. + * @param nodes A JSON array containing the node definitions. + * @return true if the cluster was successfully defined, false otherwise. + */ + public boolean defineCluster(String appID, String masterNodeName, ArrayNode nodes) { + // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed + ObjectNode body = mapper.createObjectNode() + .put("name", appID) + .put("master-node", masterNodeName); + body.putArray("nodes").addAll(nodes); + Map msg; + try { + msg = Map.of("metaData", Map.of("user", "admin"), + "body", mapper.writeValueAsString(body)); + } catch (JsonProcessingException e) { + log.error("Could not convert JSON to string (this should never happen)", + keyValue("appId", appID), e); + return false; + } + Map response = defineCluster.sendSync(msg, appID, null, false); + JsonNode payload = extractPayloadFromExnResponse(response, appID); + return payload.asBoolean(); + // TODO: check if we still need to unwrap this; see + // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 + // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 + // https://bugs.launchpad.net/nebulous/+bug/2055053 + // return payload.at("/success").asBoolean(); + } + + /** + * Get the definition of a cluster created by {@link #defineCluster}. + * + * @param appID The application ID, as used to define the cluster. + * @return The cluster definition, or null in case of error. + */ + public JsonNode getCluster(String appID) { + Map msg; + msg = Map.of("metaData", Map.of("user", "admin", "clusterName", appID)); + Map response = deployCluster.sendSync(msg, appID, null, false); + JsonNode payload = extractPayloadFromExnResponse(response, appID); + return payload.isMissingNode() ? null : payload; + } + + /** + * Deploy a cluster created by {@link #defineCluster}. + * + * @param appID The application's id, used to name the cluster. + * @return true if the cluster was successfully deployed, false otherwise. + */ + public boolean deployCluster(String appID) { + // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed + ObjectNode body = mapper.createObjectNode() + .put("applicationId", appID); + Map msg; + try { + msg = Map.of("metaData", Map.of("user", "admin"), + "body", mapper.writeValueAsString(body)); + } catch (JsonProcessingException e) { + log.error("Could not convert JSON to string (this should never happen)", + keyValue("appId", appID), e); + return false; + } + Map response = deployCluster.sendSync(msg, appID, null, false); + JsonNode payload = extractPayloadFromExnResponse(response, appID); + return payload.asBoolean(); + // TODO: check if we still need to unwrap this; see + // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 + // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 + // https://bugs.launchpad.net/nebulous/+bug/2055053 + // return payload.at("/success").asBoolean(); + } + + /** + * Submit a KubeVela file to a deployed cluster. + * + * @param appID The application's id. + * @param kubevela The KubeVela file, with node affinity traits + * corresponding to the cluster definintion. + * @return true if the application was successfully deployed, false otherwise. + */ + public boolean deployApplication(String appID, String kubevela) { + // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed + ObjectNode body = mapper.createObjectNode() + .put("applicationId", appID) + .put("KubevelaYaml", kubevela); + Map msg; + try { + msg = Map.of("metaData", Map.of("user", "admin"), + "body", mapper.writeValueAsString(body)); + } catch (JsonProcessingException e) { + log.error("Could not convert JSON to string (this should never happen)", + keyValue("appId", appID), e); + return false; + } + Map response = deployApplication.sendSync(msg, appID, null, false); + JsonNode payload = extractPayloadFromExnResponse(response, appID); + return payload.asBoolean(); + // TODO: check if we still need to unwrap this; see + // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 + // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 + // https://bugs.launchpad.net/nebulous/+bug/2055053 + // return payload.at("/success").asBoolean(); + } + + /** + * Add new nodes to a deployed cluster. + * + *

The new nodes are specified in the same way as in {@link + * #defineCluster()}. + * + * @param appID The application's id. + * @param additionalWorkers The additional nodes to add. + */ + // TODO: deserialize response into sal-common `Cluster` + public void scaleOut(String appID, ArrayNode additionalNodes) { + // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed + ObjectNode body = mapper.createObjectNode() + .put("applicationId", appID); + body.putArray("workers").addAll(additionalNodes); + Map msg; + try { + msg = Map.of("metaData", Map.of("user", "admin"), + "body", mapper.writeValueAsString(body)); + } catch (JsonProcessingException e) { + log.error("Could not convert JSON to string (this should never happen)", + keyValue("appId", appID), e); + return; + } + Map response = scaleOut.sendSync(msg, appID, null, false); + // Called for side-effect only; we want to log errors + JsonNode payload = extractPayloadFromExnResponse(response, appID); + } + + /** + * Remove nodes from a deployed cluster. + * + * @param appID The application's id. + * @param superfluousNodes The names of nodes to be removed. + * @return true if the call was successful, false otherwise. + */ + public boolean scaleIn(String appID, List superfluousNodes) { + // NOTE: not yet defined in + // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed + ArrayNode body = mapper.createArrayNode(); + superfluousNodes.forEach(nodeName -> body.add(nodeName)); + Map msg; + try { + msg = Map.of("metaData", Map.of("user", "admin"), + "body", mapper.writeValueAsString(body)); + } catch (JsonProcessingException e) { + log.error("Could not convert JSON to string (this should never happen)", + keyValue("appId", appID), e); + return false; + } + Map response = scaleIn.sendSync(msg, appID, null, false); + JsonNode payload = extractPayloadFromExnResponse(response, appID); + return payload.asBoolean(); + // TODO: check if we still need to unwrap this; see + // `AbstractProcessor.groovy#normalizeResponse` and bug 2055053 + // https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111 + // https://bugs.launchpad.net/nebulous/+bug/2055053 + // return payload.at("/success").asBoolean(); + } + + } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java index ceedce7..8354c08 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java @@ -10,7 +10,6 @@ import java.util.concurrent.CountDownLatch; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.nebulouscloud.exn.core.Publisher; import lombok.extern.slf4j.Slf4j; import picocli.CommandLine.Command; import picocli.CommandLine.Parameters; diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java index 30e6b17..7b6cd09 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import eu.nebulouscloud.exn.core.Publisher; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import static net.logstash.logback.argument.StructuredArguments.keyValue; @@ -22,6 +23,7 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.Spliterator; @@ -29,6 +31,8 @@ import java.util.Spliterators; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.ow2.proactive.sal.model.Requirement; + /** * Internal representation of a NebulOus app. */ @@ -97,6 +101,14 @@ public class NebulousApp { @Getter private JsonNode originalAppMessage; private ObjectNode original_kubevela; + /** + * The current "generation" of deployment. Initial deployment sets this + * to 1, each subsequent redeployment increases by 1. This value is used + * to name node instances generated during that deployment. + */ + @Getter @Setter + private int deployGeneration = 0; + /** * Map of component name to machine name(s) deployed for that component. * Component names are defined in the KubeVela file. We assume that @@ -111,9 +123,20 @@ public class NebulousApp { /** When an app gets deployed or redeployed, this is where we send the AMPL file */ private Publisher ampl_message_channel; - /** Have we ever been deployed? I.e., when we rewrite KubeVela, are there - * already nodes running for us? */ - private boolean deployed = false; + // /** Have we ever been deployed? I.e., when we rewrite KubeVela, are there + // * already nodes running for us? */ + // private boolean deployed = false; + + /** The KubeVela as it was most recently sent to the app's controller. */ + @Getter @Setter + private JsonNode deployedKubevela; + /** For each KubeVela component, the number of deployed nodes. All nodes + * will be identical wrt machine type etc. */ + @Getter @Setter + private Map deployedNodeCounts; + /** For each KubeVela component, the requirements for its node(s). */ + @Getter @Setter + private Map> deployedNodeRequirements; /** * The EXN connector for this class. At the moment all apps share the @@ -250,25 +273,6 @@ public class NebulousApp { return readKubevelaString(Files.readString(Path.of(path), StandardCharsets.UTF_8)); } - /** - * Set "deployed" status. Will typically be set to true once, and then - * never to false again. - * - * @param deployed the new status. - */ - public void setDeployed(boolean deployed) { - this.deployed = deployed; - } - /** - * Check if the app has been deployed, i.e., if there are already VMs - * allocated from SAL for us. - * - * @return false if we never asked for nodes, true otherwise. - */ - public boolean isDeployed() { - return deployed; - } - /** * Check that all parameters have a name, type and path, and that the * target path can be found in the original KubeVela file. @@ -433,7 +437,7 @@ public class NebulousApp { } ObjectNode variables = solution.withObjectProperty("VariableValues"); ObjectNode kubevela = rewriteKubevelaWithSolution(variables); - if (isDeployed()) { + if (deployGeneration > 0) { // We assume that killing a node will confuse the application's // Kubernetes cluster, therefore: // 1. Recalculate node sets diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java index 5b9ce68..bde80bb 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java @@ -7,8 +7,8 @@ import java.util.Map; import java.util.Set; import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer; import org.ow2.proactive.sal.model.AttributeRequirement; -import org.ow2.proactive.sal.model.CommandsInstallation; import org.ow2.proactive.sal.model.NodeCandidate; +import org.ow2.proactive.sal.model.NodeCandidate.NodeCandidateTypeEnum; import org.ow2.proactive.sal.model.NodeType; import org.ow2.proactive.sal.model.NodeTypeRequirement; import org.ow2.proactive.sal.model.OperatingSystemFamily; @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import static net.logstash.logback.argument.StructuredArguments.keyValue; @@ -33,24 +32,9 @@ import static net.logstash.logback.argument.StructuredArguments.keyValue; @Slf4j public class NebulousAppDeployer { - // TODO: find out the commands to initialize the controller - /** - * The installation scripts to send to SAL for the NebulOuS controller - * node. - */ - @Getter - private static CommandsInstallation controllerInstallation = new CommandsInstallation(); - private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); private static final ObjectMapper mapper = new ObjectMapper(); - // TODO: find out the commands to initialize the workers - /** - * The installation scripts to send to SAL for a NebulOuS worker node. - */ - @Getter - private static CommandsInstallation nodeInstallation = new CommandsInstallation(); - /** * The requirements of the node running the NebulOuS controller. * This machine runs the Kubernetes cluster and KubeVela. For @@ -122,23 +106,38 @@ public class NebulousAppDeployer { * Given a KubeVela file, extract node requirements, create the job, start * its nodes and submit KubeVela. * - * NOTE: this method is under reconstruction, pending the new endpoints. + *

NOTE: this method modifies the NebulousApp object state, storing + * various facts about the deployed cluster. * - * @param app the NebulOuS app object. + *

NOTE: this method is under reconstruction, pending the new + * endpoints. + * + * @param app The NebulOuS app object. * @param kubevela the KubeVela file to deploy. */ public static void deployApplication(NebulousApp app, JsonNode kubevela) { String appUUID = app.getUUID(); + ExnConnector conn = app.getExnConnector(); + Set chosenEdgeCandidates = new HashSet<>(); log.info("Starting initial deployment for application", keyValue("appId", appUUID)); + int deployGeneration = app.getDeployGeneration() + 1; + app.setDeployGeneration(deployGeneration); + // The overall flow: // // 1. Extract node requirements and node counts from the KubeVela // definition. - // 2. Find node candidates for all workers and the controller. - // 3. Select node candidates. + // 2. Ask resource broker for node candidates for all workers and the + // controller. + // 3. Select node candidates, making sure to only select edge nodes + // once. // 4. Create a SAL cluster. // 5. Deploy the SAL cluster. + // 6. Add node affinity traits to the KubeVela file. + // 7. Deploy the SAL application. + // 8. Store cluster state (deployed KubeVela file, etc.) in + // NebulousApp object. // ------------------------------------------------------------ // 1. Extract node requirements @@ -152,29 +151,46 @@ public class NebulousAppDeployer { // ---------------------------------------- // 2. Find node candidates - // ArrayNode controllerCandidates = SalConnector.findNodeCandidates(controllerRequirements, appUUID); - // if (controllerCandidates.isEmpty()) { - // log.error("Could not find node candidates for requirements: {}", - // controllerRequirements, keyValue("appId", appUUID)); - // // Continue here while we don't really deploy - // // return; - // } - // Map workerCandidates = new HashMap<>(); - // for (Map.Entry> e : workerRequirements.entrySet()) { - // String nodeName = e.getKey(); - // List requirements = e.getValue(); - // ArrayNode candidates = SalConnector.findNodeCandidates(requirements, appUUID); - // if (candidates.isEmpty()) { - // log.error("Could not find node candidates for requirements: {}", requirements); - // // Continue here while we don't really deploy - // // return; - // } - // workerCandidates.put(nodeName, candidates); - // } + List controllerCandidates = conn.findNodeCandidates(controllerRequirements, appUUID); + if (controllerCandidates.isEmpty()) { + log.error("Could not find node candidates for requirements: {}", + controllerRequirements, keyValue("appId", appUUID)); + // Continue here while we don't really deploy + // return; + } + Map> workerCandidates = new HashMap<>(); + for (Map.Entry> e : workerRequirements.entrySet()) { + String nodeName = e.getKey(); + List requirements = e.getValue(); + List candidates = conn.findNodeCandidates(requirements, appUUID); + if (candidates.isEmpty()) { + log.error("Could not find node candidates for requirements: {}", requirements, + keyValue("appId", appUUID)); + // Continue here while we don't really deploy + // return; + } + workerCandidates.put(nodeName, candidates); + } // ------------------------------------------------------------ // 3. Select node candidates + // Controller node + log.debug("Deciding on controller node candidate", keyValue("appId", appUUID)); + NodeCandidate masterNodeCandidate = null; + if (controllerCandidates.size() > 0) { + masterNodeCandidate = controllerCandidates.get(0); + if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE) + .contains(masterNodeCandidate.getNodeCandidateType())) { + // Mark this candidate as already chosen + chosenEdgeCandidates.add(masterNodeCandidate); + } + } else { + log.error("Empty node candidate list for controller, continuing without creating node", + keyValue("appId", appUUID)); + } + + // Component nodes log.debug("Collecting worker nodes for {}", appUUID, keyValue("appId", appUUID)); Map nodeNameToCandidate = new HashMap<>(); for (Map.Entry> e : workerRequirements.entrySet()) { @@ -185,28 +201,30 @@ public class NebulousAppDeployer { int numberOfNodes = nodeCounts.get(componentName); Set nodeNames = new HashSet<>(); for (int i = 1; i <= numberOfNodes; i++) { - String nodeName = String.format("%s-%s", componentName, i); + String nodeName = String.format("%s-%s-%s", componentName, deployGeneration, i); + List candidates = workerCandidates.get(componentName); + + if (candidates.size() == 0) { + log.error("Empty node candidate list for component ~s, continuing without creating node", componentName, keyValue("appId", appUUID)); + continue; + } + + NodeCandidate candidate = candidates.stream() + .filter(each -> !chosenEdgeCandidates.contains(each)) + .findFirst() + .orElse(null); + if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE).contains(candidate.getNodeCandidateType())) { + // We could remove this candidate from `candidates` here, + // to save skipping over already-assigned edge nodes for + // the next replica of this component, but we don't want + // to make assumptions on whether the candidate list can + // be modified. Note that we have to keep track of all + // assigned edge nodes in any case, since they might be + // candidates in subsequent components. + chosenEdgeCandidates.add(candidate); + } + nodeNameToCandidate.put(nodeName, candidate); nodeNames.add(nodeName); - // TODO: choose the node candidate with the highest score - // and/or ranking. - - // TODO: Here we need to discriminate between edge and cloud - // node candidates: we can deploy an edge node only once, but - // cloud nodes arbitrarily often. So if the best node - // candidate is an edge node, we should select it and fill the - // rest of the nodes with second-best cloud nodes. - - // // TODO: make sure we only choose the same edge node once; it - // // might be in all node candidate lists :) - // if (!workerCandidates.get(componentName).isEmpty()) { - // // should always be true, except currently we don't abort - // // in Step 2 if we don't find candidates. - // JsonNode candidate = workerCandidates.get(componentName).get(0); - // NodeCandidate c = mapper.convertValue(((ObjectNode)candidate).deepCopy() - // .remove(List.of("score", "ranking")), - // NodeCandidate.class); - // nodeNameToCandidate.put(nodeName, c); - // } } app.getComponentMachineNames().put(componentName, nodeNames); } @@ -216,30 +234,57 @@ public class NebulousAppDeployer { // ------------------------------------------------------------ // 4. Create cluster - // TODO: call defineCluster endpoint with nodename -> candidate - // mapping etc. + String masterNodeName = "masternode"; // safe because all component node names end with a number + ObjectNode cluster = mapper.createObjectNode(); + cluster.put("name", appUUID) + .put("master-node", masterNodeName); + ArrayNode nodes = cluster.withArray("nodes"); + if (masterNodeCandidate != null) { + nodes.addObject() + .put("nodeName", masterNodeName) + .put("nodeCandidateId", masterNodeCandidate.getId()) + .put("cloudId", masterNodeCandidate.getCloud().getId()); + } + nodeNameToCandidate.forEach((name, candidate) -> { + nodes.addObject() + .put("nodeName", name) + .put("nodeCandidateId", candidate.getId()) + .put("cloudId", candidate.getCloud().getId()); + }); + boolean defineClusterSuccess = conn.defineCluster(appUUID, masterNodeName, null); // ------------------------------------------------------------ // 5. Deploy cluster + boolean deployClusterSuccess = conn.deployCluster(appUUID); - // TODO: call deployCluster endpoint - + // ------------------------------------------------------------ + // 6. Rewrite KubeVela JsonNode rewritten = addNodeAffinities(kubevela, app.getComponentMachineNames()); String rewritten_kubevela = "---\n# Did not manage to create rewritten KubeVela"; try { rewritten_kubevela = yamlMapper.writeValueAsString(rewritten); } catch (JsonProcessingException e) { - log.error("Failed to convert KubeVela to YAML; this should never happen", e); + log.error("Failed to convert KubeVela to YAML; this should never happen", keyValue("appId", appUUID), e); } Main.logFile("rewritten-kubevela-" + appUUID + ".yaml", rewritten_kubevela); + + // ------------------------------------------------------------ + // 7. Deploy application + // TODO: call deployApplication endpoint + + // ------------------------------------------------------------ + // 8. Update NebulousApp state + + // TODO: store rewritten KubeVela in application object } /** - * Given a KubeVela file, adapt the running application to its specification. + * Given a KubeVela file, adapt the running application to its + specification. * - * The KubeVela file will have been rewritten with updated - * information from the solver. + * The KubeVela file will have been rewritten with updated information + * from the solver. * * NOTE: this method is under development, pending the new endpoints. * @@ -248,16 +293,17 @@ public class NebulousAppDeployer { */ public static void redeployApplication(NebulousApp app, ObjectNode kubevela) { String appUUID = app.getUUID(); - log.info("Starting redeployment of {}", appUUID); + int deployGeneration = app.getDeployGeneration() + 1; + app.setDeployGeneration(deployGeneration); + log.info("Starting redeployment generation {}", deployGeneration, keyValue("appId", appUUID)); // The overall flow: // // 1. Extract node requirements and node counts from the updated // KubeVela definition. - // 2. Extract current nodes from running SAL job - // 3. Calculate new (to be started) and superfluous (to be shutdown) - // nodes - // 4. Find node candidates for new nodes (from Step 3) according to + // 2. Calculate new (to be started) and superfluous (to be shutdown) + // nodes by comparing against previous deployment. + // 3. Find node candidates for new nodes (from Step 3) according to // their requirements (from Step 1) // 5. Rewrite KubeVela with updated node affinities // 6. Call clusterScaleOut endpoint with list of added nodes @@ -269,12 +315,12 @@ public class NebulousAppDeployer { // 1. Extract node requirements Map> workerRequirements = KubevelaAnalyzer.getRequirements(kubevela); Map nodeCounts = KubevelaAnalyzer.getNodeCount(kubevela); - List controllerRequirements = getControllerRequirements(appUUID); + + Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements); Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts); - Main.logFile("controller-requirements-" + appUUID + ".txt", controllerRequirements); - + } } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java index 26bbf8e..7999be6 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java @@ -12,7 +12,7 @@ import java.util.concurrent.ConcurrentHashMap; */ @Slf4j public class NebulousApps { - + /** The global app registry. */ // (Putting this here until we find a better place.) private static final Map apps = new ConcurrentHashMap(); diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/SalConnector.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/SalConnector.java deleted file mode 100644 index 1500740..0000000 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/SalConnector.java +++ /dev/null @@ -1,73 +0,0 @@ -package eu.nebulouscloud.optimiser.controller; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.ow2.proactive.sal.model.NodeCandidate; -import org.ow2.proactive.sal.model.Requirement; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import lombok.extern.slf4j.Slf4j; -import static net.logstash.logback.argument.StructuredArguments.keyValue; - - -/** - * A class that wraps communication with SAL (the Scheduling Abstraction Layer - * of ProActive) over EXN. - * - * Documentation of the SAL REST API is here: - * https://github.com/ow2-proactive/scheduling-abstraction-layer/tree/master/documentation - */ -@Slf4j -public class SalConnector { - - private SalConnector() {} - - private static final ObjectMapper mapper = new ObjectMapper(); - - /** - * Get list of node candidates from the resource broker that fulfil the - given requirements. - * - *

Note that we cannot convert the result to a list containing {@code - * org.ow2.proactive.sal.model.NodeCandidate} instances, since the broker - * adds the additional fields {@code score} and {@code ranking}. Instead - * we return a JSON {@code ArrayNode} containing {@code ObjectNode}s in - * the format specified at - * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/nodecandidates-endpoints.md#71--filter-node-candidates-endpoint - * but with these two additional attributes. - * - * @param requirements The list of requirements. - * @param appID The application ID. - * @return A JSON array containing node candidates. - */ - public static ArrayNode findNodeCandidates(List requirements, String appID) { - Map msg; - try { - msg = Map.of( - "metaData", Map.of("user", "admin"), - "body", mapper.writeValueAsString(requirements)); - } catch (JsonProcessingException e) { - log.error("Could not convert requirements list to JSON string (this should never happen)", - keyValue("appId", appID), e); - return null; - } - Map response = ExnConnector.findBrokerNodeCandidates.sendSync(msg, appID, null, false); - ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class); - // Note: what we would really like to do here is something like: - // - // return Arrays.asList(mapper.readValue(response, NodeCandidate[].class)); - // - // But since the broker adds two attributes, the array elements cannot - // be deserialized into org.ow2.proactive.sal.model.NodeCandidate - // objects. - return jsonBody.withArray("/nodes"); - } - -}