From a23692efa08cdc0dbf5066823b71fd4e89e27542 Mon Sep 17 00:00:00 2001 From: Rudi Schlatte Date: Fri, 9 Feb 2024 10:18:46 +0200 Subject: [PATCH] Start to use exn middleware to talk with SAL NOTE: NebulousAppDeployer.deployApplication is only half implemented, awaiting new SAL endpoints. Change-Id: I10cbb7770a59eed963579804fd66428ab6771cc4 --- .../templates/deployment.yaml | 9 - .../optimiser-controller-secrets.yaml | 1 - .../nebulous-optimiser-controller/values.yaml | 5 - optimiser-controller/build.gradle | 8 - .../optimiser/controller/ExnConnector.java | 39 ++- .../optimiser/controller/LocalExecution.java | 8 +- .../optimiser/controller/Main.java | 39 +-- .../optimiser/controller/NebulousApp.java | 44 ++- .../controller/NebulousAppDeployer.java | 197 +++++------ .../optimiser/controller/SalConnector.java | 318 ++---------------- .../controller/NebulousAppTests.java | 4 +- 11 files changed, 170 insertions(+), 502 deletions(-) diff --git a/charts/nebulous-optimiser-controller/templates/deployment.yaml b/charts/nebulous-optimiser-controller/templates/deployment.yaml index 0f2c988..dbbd068 100644 --- a/charts/nebulous-optimiser-controller/templates/deployment.yaml +++ b/charts/nebulous-optimiser-controller/templates/deployment.yaml @@ -46,15 +46,6 @@ spec: value: "{{ .Values.activemq.ACTIVEMQ_PORT }}" - name: ACTIVEMQ_USER value: "{{ .Values.activemq.ACTIVEMQ_USER }}" - - name: SAL_USER - value: "{{ .Values.sal.SAL_USER }}" - - name: SAL_URL - value: "{{ .Values.sal.SAL_URL }}" - - name: SAL_PASSWORD - valueFrom: - secretKeyRef: - name: {{ include "nebulous-optimiser-controller.fullname" . }}-secrets - key: SAL_PASSWORD - name: ACTIVEMQ_PASSWORD valueFrom: secretKeyRef: diff --git a/charts/nebulous-optimiser-controller/templates/optimiser-controller-secrets.yaml b/charts/nebulous-optimiser-controller/templates/optimiser-controller-secrets.yaml index 44e9bd3..5ee535b 100644 --- a/charts/nebulous-optimiser-controller/templates/optimiser-controller-secrets.yaml +++ b/charts/nebulous-optimiser-controller/templates/optimiser-controller-secrets.yaml @@ -7,4 +7,3 @@ metadata: type: Opaque data: ACTIVEMQ_PASSWORD: {{ .Values.secrets.ACTIVEMQ_PASSWORD | b64enc | quote }} - SAL_PASSWORD: {{ .Values.secrets.SAL_PASSWORD | b64enc | quote }} \ No newline at end of file diff --git a/charts/nebulous-optimiser-controller/values.yaml b/charts/nebulous-optimiser-controller/values.yaml index 5d14ce3..d512927 100644 --- a/charts/nebulous-optimiser-controller/values.yaml +++ b/charts/nebulous-optimiser-controller/values.yaml @@ -84,10 +84,6 @@ affinity: {} debug: LOGDIR: /tmp/nebulous -sal: - SAL_URL: sal - SAL_USER: admin - activemq: ACTIVEMQ_HOST: activemq ACTIVEMQ_PORT: 5672 @@ -95,4 +91,3 @@ activemq: secrets: ACTIVEMQ_PASSWORD: nebulous - SAL_PASSWORD: admin diff --git a/optimiser-controller/build.gradle b/optimiser-controller/build.gradle index f58e23e..a794b04 100644 --- a/optimiser-controller/build.gradle +++ b/optimiser-controller/build.gradle @@ -48,14 +48,6 @@ dependencies { // SAL client library implementation 'org.ow2.proactive:sal-common:13.1.0-SNAPSHOT' - // HTTP requests; used by Melodic and we adapt their SAL client code - // https://github.com/reactor/reactor-netty - implementation 'io.projectreactor.netty:reactor-netty:1.1.15' - - // HTTPHeaders etc.; used by Melodic and we adapt their SAL client code - // https://mvnrepository.com/artifact/org.springframework/spring-web - implementation 'org.springframework:spring-web:6.1.3' - // Logging: https://www.slf4j.org implementation 'org.slf4j:slf4j-api:2.0.9' // We use java.util.logging as the backend for now; see diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java index cea20a1..b45bba0 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java @@ -5,17 +5,24 @@ import eu.nebulouscloud.exn.core.Consumer; import eu.nebulouscloud.exn.core.Context; import eu.nebulouscloud.exn.core.Handler; import eu.nebulouscloud.exn.core.Publisher; +import eu.nebulouscloud.exn.core.SyncedPublisher; import eu.nebulouscloud.exn.handlers.ConnectorHandler; import eu.nebulouscloud.exn.settings.StaticExnConfig; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.qpid.protonj2.client.Message; +import org.ow2.proactive.sal.model.NodeCandidate; +import org.ow2.proactive.sal.model.Requirement; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -53,6 +60,30 @@ public class ExnConnector { @Getter private final Publisher amplMessagePublisher; + // ---------------------------------------- + // Communication with SAL + + // We define these publishers here instead of in the `SalConnector` + // class since they need to be registered here and I'm afraid I will + // forget to do it when adding new endpoints over in another class. + + /** The createJob endpoint. */ + public static final SyncedPublisher createJob + = new SyncedPublisher("createJob", + "eu.nebulouscloud.exn.sal.job.post", true, true); + /** The findNodeCandidates endpoint. */ + public static final SyncedPublisher findNodeCandidates + = new SyncedPublisher("findNodeCandidates", + "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true); + /** The addNodes endpoint. */ + public static final SyncedPublisher addNodes + = new SyncedPublisher("addNodes", + "eu.nebulouscloud.exn.sal.nodes.add", true, true); + /** The submitJob endpoint. */ + public static final SyncedPublisher submitJob + = new SyncedPublisher("submitJob", + "eu.nebulouscloud.exn.sal.job.update", true, true); + /** * Create a connection to ActiveMQ via the exn middleware, and set up the * initial publishers and consumers. @@ -71,7 +102,8 @@ public class ExnConnector { conn = new Connector("optimiser_controller", callback, // List.of(new Publisher("config", "config", true)), - List.of(amplMessagePublisher), + List.of(amplMessagePublisher, + createJob, findNodeCandidates, addNodes, submitJob), List.of(new Consumer("ui_app_messages", app_creation_channel, new AppCreationMessageHandler(), true, true)), true, true, @@ -125,7 +157,9 @@ public class ExnConnector { log.info("App creation message received for app {}", app_id); JsonNode appMessage = mapper.valueToTree(body); Main.logFile("app-message-" + app_id + ".json", appMessage); - NebulousApp app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), amplMessagePublisher); + NebulousApp app = NebulousApp.newFromAppMessage( + // TODO create a new ExnConnector here? + mapper.valueToTree(body), ExnConnector.this); NebulousApps.add(app); app.sendAMPL(); app.deployUnmodifiedApplication(); @@ -161,4 +195,5 @@ public class ExnConnector { } } } + } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java index 9e46d74..f743171 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java @@ -34,10 +34,8 @@ public class LocalExecution implements Callable { @Override public Integer call() { ObjectMapper mapper = new ObjectMapper(); CountDownLatch exn_synchronizer = new CountDownLatch(1); - ExnConnector connector = main.getActiveMQConnector(); - Publisher publisher = null; + ExnConnector connector = Main.getActiveMQConnector(); if (connector != null) { - publisher = connector.getAmplMessagePublisher(); connector.start(exn_synchronizer); } JsonNode msg; @@ -47,9 +45,9 @@ public class LocalExecution implements Callable { log.error("Could not read an input file: ", e); return 1; } - NebulousApp app = NebulousApp.newFromAppMessage(msg, publisher); + NebulousApp app = NebulousApp.newFromAppMessage(msg, connector); if (connector != null) { - log.debug("Sending AMPL to channel {}", publisher); + log.debug("Sending AMPL to channel {}", connector.getAmplMessagePublisher()); app.sendAMPL(); app.deployUnmodifiedApplication(); } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/Main.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/Main.java index 286cf04..b1325d4 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/Main.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/Main.java @@ -39,24 +39,6 @@ import static picocli.CommandLine.Option; ) public class Main implements Callable { - @Option(names = {"-s", "--sal-url"}, - description = "The URL of the SAL server (including URL scheme http:// or https://). Can also be set via the @|bold SAL_URL|@ environment variable.", - paramLabel = "SAL_URL", - defaultValue = "${SAL_URL:-http://localhost:8880/}") - private java.net.URI sal_uri; - - @Option(names = {"--sal-user"}, - description = "The user name for the SAL server. Can also be set via the @|bold SAL_USER|@ environment variable.", - paramLabel = "SAL_USER", - defaultValue = "${SAL_USER}") - private String sal_user; - - @Option(names = {"--sal-password"}, - description = "The password for the SAL server. Can also be set via the @|bold SAL_PASSWORD|@ environment variable.", - paramLabel = "SAL_PASSWORD", - defaultValue = "${SAL_PASSWORD}") - private String sal_password; - @Option(names = {"--activemq-host"}, description = "The hostname of the ActiveMQ server. Can also be set via the @|bold ACTIVEMQ_HOST|@ environment variable.", paramLabel = "ACTIVEMQ_HOST", @@ -93,20 +75,13 @@ public class Main implements Callable { scope = ScopeType.INHERIT) private boolean[] verbosity; - /** - * The connector to the SAL library. - * - * @return the SAL connector, or null if running offline. - */ - @Getter - private SalConnector salConnector = null; /** * The ActiveMQ connector. * * @return the ActiveMQ connector wrapper, or null if running offline. */ @Getter - private ExnConnector activeMQConnector = null; + private static ExnConnector activeMQConnector = null; /** * PicoCLI execution strategy that uses common initialization. @@ -157,18 +132,6 @@ public class Main implements Callable { log.info("Logging all messages to directory {}", logDirectory); } } - // Start connection to SAL if possible. - if (sal_uri != null && sal_user != null && sal_password != null) { - salConnector = new SalConnector(sal_uri, sal_user, sal_password); - if (!salConnector.isConnected()) { - log.warn("Connection to SAL unsuccessful, continuing without SAL"); - } else { - log.info("Established connection to SAL"); - NebulousApp.setSalConnector(salConnector); - } - } else { - log.debug("SAL login information not specified, skipping"); - } // Start connection to ActiveMQ if possible. if (activemq_user != null && activemq_password != null) { log.info("Preparing ActiveMQ connection: host={} port={}", diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java index 064368c..fe413c5 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java @@ -11,7 +11,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import eu.nebulouscloud.exn.core.Publisher; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -95,17 +94,6 @@ public class NebulousApp { /** The original app message. */ @Getter private JsonNode originalAppMessage; private ObjectNode original_kubevela; - /** - * The active SAL connector, or null if we operate offline. - * - * NOTE: this might only be used until we switch to the exn-sal - * middleware, or maybe we keep the SalConnector class and send to exn - * from there. - * - * @param salConnector the SAL connector. - */ - @Setter @Getter - private static SalConnector salConnector; /** * Map of component name to machine name(s) deployed for that component. @@ -125,6 +113,14 @@ public class NebulousApp { * already nodes running for us? */ private boolean deployed = false; + /** + * The EXN connector for this class. At the moment all apps share the + * same instance, but probably every app should have their own, out of + * thread-safety concerns. + */ + @Getter + private ExnConnector exnConnector; + /** * Creates a NebulousApp object. * @@ -134,11 +130,12 @@ public class NebulousApp { */ // Note that example KubeVela and parameter files can be found at // optimiser-controller/src/test/resources/ - public NebulousApp(JsonNode app_message, ObjectNode kubevela, Publisher ampl_message_channel) { + public NebulousApp(JsonNode app_message, ObjectNode kubevela, ExnConnector exnConnector) { this.UUID = app_message.at(uuid_path).textValue(); this.name = app_message.at(name_path).textValue(); this.originalAppMessage = app_message; this.original_kubevela = kubevela; + this.exnConnector = exnConnector; JsonNode parameters = app_message.at(variables_path); if (parameters.isArray()) { this.kubevelaVariables = (ArrayNode)app_message.at(variables_path); @@ -146,7 +143,6 @@ public class NebulousApp { log.error("Cannot read parameters from app message '{}', continuing without parameters", UUID); this.kubevelaVariables = mapper.createArrayNode(); } - this.ampl_message_channel = ampl_message_channel; for (final JsonNode p : kubevelaVariables) { kubevela_variable_paths.put(p.get("key").asText(), JsonPointer.compile(p.get("path").asText())); @@ -212,11 +208,14 @@ public class NebulousApp { /** * Create a NebulousApp object given an app creation message parsed into JSON. * - * @param app_message the app creation message, including valid KubeVela YAML et al - * @param ampl_message_channel conduit to broadcast the current AMPL file - * @return a NebulousApp object, or null if `app_message` could not be parsed + * @param app_message the app creation message, including valid KubeVela + * YAML et al + * @param exnConnector The EXN connector to use for sending messages to + * the solver etc. + * @return a NebulousApp object, or null if `app_message` could not be + * parsed */ - public static NebulousApp newFromAppMessage(JsonNode app_message, Publisher ampl_message_channel) { + public static NebulousApp newFromAppMessage(JsonNode app_message, ExnConnector exnConnector) { try { String kubevela_string = app_message.at(kubevela_path).textValue(); JsonNode parameters = app_message.at(variables_path); @@ -226,8 +225,7 @@ public class NebulousApp { } else { Main.logFile("incoming-kubevela-" + app_message.at(uuid_path).textValue() + ".yaml", kubevela_string); return new NebulousApp(app_message, - (ObjectNode)readKubevelaString(kubevela_string), - ampl_message_channel); + (ObjectNode)readKubevelaString(kubevela_string), exnConnector); } } catch (Exception e) { log.error("Could not read app creation message: ", e); @@ -348,10 +346,6 @@ public class NebulousApp { * Calculate AMPL file and send it off to the solver. */ public void sendAMPL() { - if (ampl_message_channel == null) { - log.warn("AMPL publisher not set, cannot send AMPL file"); - return; - } String ampl = AMPLGenerator.generateAMPL(this); ObjectNode msg = mapper.createObjectNode(); msg.put("FileName", getUUID() + ".ampl"); @@ -381,7 +375,7 @@ public class NebulousApp { constant.set("Value", value); } - ampl_message_channel.send(mapper.convertValue(msg, Map.class), getUUID(), true); + exnConnector.getAmplMessagePublisher().send(mapper.convertValue(msg, Map.class), getUUID(), true); Main.logFile("to-solver-" + getUUID() + ".json", msg.toString()); Main.logFile("to-solver-" + getUUID() + ".ampl", ampl); } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java index eebf625..ebe3460 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java @@ -141,7 +141,7 @@ public class NebulousAppDeployer { * requirements for that component. No requirements mean any node will * suffice. */ - public static Map> getSalRequirementsFromKubevela(JsonNode kubevela) { + public static Map> getWorkerRequirementsFromKubevela(JsonNode kubevela) { Map> result = new HashMap<>(); ArrayNode components = kubevela.withArray("/spec/components"); for (final JsonNode c : components) { @@ -194,7 +194,8 @@ public class NebulousAppDeployer { } } for (final JsonNode t : c.withArray("/traits")) { - // TODO: Check for node affinity / geoLocation / country + // TODO: Check for node affinity / geoLocation / country / + // node type (edge or cloud) } // Finally, add requirements for this job to the map result.put(componentName, reqs); @@ -252,139 +253,103 @@ public class NebulousAppDeployer { * Given a KubeVela file, extract node requirements, create the job, start * its nodes and submit KubeVela. * + * NOTE: this method is under reconstruction, pending the new endpoints. + * * @param app the NebulOuS app object. * @param kubevela the KubeVela file to deploy. */ public static void deployApplication(NebulousApp app, JsonNode kubevela) { String appUUID = app.getUUID(); - String appName = app.getName(); log.info("Starting initial deployment of {}", appUUID); - if (NebulousApp.getSalConnector() == null) { - log.warn("Tried to submit job, but do not have a connection to SAL"); - return; - } + // The overall flow: // // 1. Extract node requirements and node counts from the KubeVela // definition. - // 2. Create a SAL job, with the uuid and name of the NebulOuS app - // 3. Create a coordinator node with hardcoded requirements; this node - // will run the Kubernetes controller. This node is in addition to - // the nodes required by KubeVela. - // 4. Submit the job, thereby starting the coordinator node - // 5. Extract information (IP address, ...) from the coordinator node - // 6. Add the worker nodes to the job - // 7. Rewrite the KubeVela file to add node affinities to each - // component - // 8. Send the KubeVela file to the coordinator node + // 2. Find node candidates for all workers and the controller. + // 3. Select node candidates. + // 4. Create a SAL cluster. + // 5. Deploy the SAL cluster. // ------------------------------------------------------------ // 1. Extract node requirements - Map> requirements = getSalRequirementsFromKubevela(kubevela); + Map> workerRequirements = getWorkerRequirementsFromKubevela(kubevela); Map nodeCounts = getNodeCountFromKubevela(kubevela); + List controllerRequirements = getControllerRequirements(appUUID); - Main.logFile("node-requirements-" + appUUID + ".txt", requirements); - Main.logFile("node-counts-" + appUUID + ".txt", nodeCounts); + Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements); + Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts); + Main.logFile("contoller-requirements-" + appUUID + ".txt", controllerRequirements); + // ---------------------------------------- + // 2. Find node candidates - // ------------------------------------------------------------ - // 2. Create SAL job - log.debug("Creating job info for {}", appUUID); - JobInformation jobinfo = new JobInformation(appUUID, appName); - // TODO: figure out what ports to specify here - List communications = List.of(); - // This task is deployed on the controller node (the one not specified - // in the app KubeVela file) - TaskDefinition nebulous_controller_task = new TaskDefinition( - "nebulous-controller", controllerInstallation, List.of()); - // This task is deployed on all worker nodes (the ones specified by - // the app KubeVela file and optimized by NebulOuS) - // TODO: find out if/how to modify `nebulous_worker_task` to pass in - // information about the controller - TaskDefinition nebulous_worker_task = new TaskDefinition( - "nebulous-worker", nodeInstallation, List.of()); - List tasks = List.of(nebulous_controller_task, nebulous_worker_task); - JobDefinition job = new JobDefinition(communications, jobinfo, tasks); - Boolean success = NebulousApp.getSalConnector().createJob(job); - if (!success) { - // This can happen if the job has already been submitted - log.error("Error trying to create the job; SAL createJob returned {}", success); - log.debug("Check if a job with id {} already exists, run stopJobs if yes", appUUID); - return; + // TODO: switch to asking the cloud broker for candidates when it's + // ready + List controllerCandidates = SalConnector.findNodeCandidates(controllerRequirements, appUUID); + if (controllerCandidates.isEmpty()) { + log.error("Could not find node candidates for requirements: {}", controllerRequirements); + // Continue here while we don't really deploy + // return; + } + Map> workerCandidates = new HashMap<>(); + for (Map.Entry> e : workerRequirements.entrySet()) { + String nodeName = e.getKey(); + List requirements = e.getValue(); + List candidates = SalConnector.findNodeCandidates(requirements, appUUID); + if (candidates.isEmpty()) { + log.error("Could not find node candidates for requirements: {}", requirements); + // Continue here while we don't really deploy + // return; + } + workerCandidates.put(nodeName, candidates); } // ------------------------------------------------------------ - // 3. Create coordinator node - log.debug("Creating app coordinator node for {}", appUUID); - List controller_candidates - = NebulousApp.getSalConnector().findNodeCandidates(getControllerRequirements(appUUID)); - if (controller_candidates.isEmpty()) { - log.error("Could not find node candidates for controller node; requirements: {}", - getControllerRequirements(appUUID)); - return; - } - NodeCandidate controller_candidate = controller_candidates.get(0); + // 3. Select node candidates - IaasDefinition controller_def = new IaasDefinition( - "nebulous-controller-node", "nebulous-controller", - controller_candidate.getId(), controller_candidate.getCloud().getId()); - success = NebulousApp.getSalConnector().addNodes(List.of(controller_def), appUUID); - if (!success) { - log.error("Failed to add controller node: {}", controller_candidate); - return; - } - - // ------------------------------------------------------------ - // 4. Submit job - log.debug("Starting job {}", appUUID); - String return_job_id = NebulousApp.getSalConnector().submitJob(appUUID); - if (return_job_id.equals("-1")) { - log.error("Failed to add start job {}, SAL returned {}", - appUUID, return_job_id); - return; - } - - // ------------------------------------------------------------ - // 5. Extract coordinator node information - - // TODO - - // ------------------------------------------------------------ - // 6. Create worker nodes from requirements - log.debug("Starting worker nodes for {}", appUUID); - for (Map.Entry> e : requirements.entrySet()) { + log.debug("Collecting worker nodes for {}", appUUID); + Map nodeNameToCandidate = new HashMap<>(); + for (Map.Entry> e : workerRequirements.entrySet()) { + // Here we collect two things: the flat list (hostname -> + // candidate) to send to createCluster, and the per-component + // hostname sets that we remember in the app object. String componentName = e.getKey(); - int numberOfNodes = nodeCounts.get(e.getKey()); + int numberOfNodes = nodeCounts.get(componentName); Set nodeNames = new HashSet<>(); - for (int i = 1; i <= numberOfNodes; i++){ - nodeNames.add(String.format("%s-%s", componentName, i)); + for (int i = 1; i <= numberOfNodes; i++) { + String nodeName = String.format("%s-%s", componentName, i); + nodeNames.add(nodeName); + // TODO: Here we need to discriminate between edge and cloud + // node candidates: we can deploy an edge node only once, but + // cloud nodes arbitrarily often. So if the best node + // candidate is an edge node, we should select it and fill the + // rest of the nodes with second-best cloud nodes. + + // TODO: make sure we only choose the same edge node once; it + // might be in all node candidate lists :) + if (!workerCandidates.get(componentName).isEmpty()) { + // should always be true, except currently we don't abort + // in Step 2 if we don't find candidates. + NodeCandidate candidate = workerCandidates.get(componentName).get(0); + nodeNameToCandidate.put(nodeName, candidate); + } } app.getComponentMachineNames().put(componentName, nodeNames); - if (numberOfNodes == 0) { - // Do not ask for node candidates if this component's replica - // count is 0. Note that we still set the app's machine names - // to an empty set. - continue; - } - List candidates = NebulousApp.getSalConnector().findNodeCandidates(e.getValue()); - if (candidates.isEmpty()) { - log.error("Could not find node candidates for requirements: {}", e.getValue()); - return; - } - NodeCandidate candidate = candidates.get(0); - List componentDefinitions = nodeNames.stream() - .map(name -> new IaasDefinition(name, "nebulous-worker", candidate.getId(), candidate.getCloud().getId())) - .collect(Collectors.toList()); - log.debug("Asking for {} copies of {} for application {}", numberOfNodes, candidate, appUUID); - success = NebulousApp.getSalConnector().addNodes(componentDefinitions, appUUID); - if (!success) { - log.error("Failed to add nodes for component {}", componentName); - } } Main.logFile("nodenames-" + appUUID + ".txt", app.getComponentMachineNames()); - // ------------------------------------------------------------ - // 7. Rewrite KubeVela file, based on running node names + Main.logFile("worker-nodes-" + appUUID + ".txt", nodeNameToCandidate); + + // ------------------------------------------------------------ + // 4. Create cluster + + // TODO: call defineCluster endpoint with nodename -> candidate + // mapping etc. + + // ------------------------------------------------------------ + // 5. Deploy cluster + + // TODO: call deployCluster endpoint - // TODO JsonNode rewritten = addNodeAffinities(kubevela, app.getComponentMachineNames()); String rewritten_kubevela = "---\n# Did not manage to create rewritten KubeVela"; try { @@ -393,11 +358,7 @@ public class NebulousAppDeployer { log.error("Failed to convert KubeVela to YAML; this should never happen", e); } Main.logFile("rewritten-kubevela-" + appUUID + ".yaml", rewritten_kubevela); - - // ------------------------------------------------------------ - // 8. Submit KubeVela file to coordinator node - - // TODO + // TODO: call deployApplication endpoint } /** @@ -405,7 +366,7 @@ public class NebulousAppDeployer { */ public static void redeployApplication(NebulousApp app, ObjectNode kubevela) { // The overall flow: - // + // // 1. Extract node requirements and node counts from the updated // KubeVela definition. // 2. Extract current nodes from running SAL job @@ -413,10 +374,10 @@ public class NebulousAppDeployer { // nodes // 4. Find node candidates for new nodes (from Step 3) according to // their requirements (from Step 1) - // 5. Create nodes, add them to SAL job - // 6. Rewrite KubeVela with updated node affinities - // 7. Send updated KubeVela to running cluster - // 8. Shut down superfluous nodes (from Step 3) + // 5. Rewrite KubeVela with updated node affinities + // 6. Call clusterScaleOut endpoint with list of added nodes + // 7. Call deployApplication with rewritten KubeVela + // 8. call clusterScaleIn endpoint with list of removed node names } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/SalConnector.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/SalConnector.java index b593355..8978313 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/SalConnector.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/SalConnector.java @@ -1,310 +1,50 @@ package eu.nebulouscloud.optimiser.controller; -import com.fasterxml.jackson.annotation.JsonSetter; -import com.fasterxml.jackson.annotation.Nulls; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.ow2.proactive.sal.model.NodeCandidate; +import org.ow2.proactive.sal.model.Requirement; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.logging.LogLevel; import lombok.extern.slf4j.Slf4j; -import org.ow2.proactive.sal.model.IaasDefinition; -import org.ow2.proactive.sal.model.Job; -import org.ow2.proactive.sal.model.JobDefinition; -import org.ow2.proactive.sal.model.NodeCandidate; -import org.ow2.proactive.sal.model.PACloud; -import org.ow2.proactive.sal.model.Requirement; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import reactor.core.publisher.Mono; -import reactor.netty.ByteBufMono; -import reactor.netty.http.client.HttpClient; -import reactor.netty.transport.logging.AdvancedByteBufFormat; - -import java.io.IOException; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - /** * A class that wraps communication with SAL (the Scheduling Abstraction Layer - * of ProActive) over REST. + * of ProActive) over EXN. * * Documentation of the SAL REST API is here: - * https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1 + * https://github.com/ow2-proactive/scheduling-abstraction-layer/tree/master/documentation */ @Slf4j public class SalConnector { - private static final String connectStr = "sal/pagateway/connect"; - private static final String getAllCloudsStr = "sal/clouds"; - private static final String findNodeCandidatesStr = "sal/nodecandidates"; - private static final String createJobStr = "sal/job"; - private static final String getJobsStr = "sal/job"; // same, but different method/body - private static final String addNodesFormatStr = "sal/node/%s"; - private static final String submitJobFormatStr = "sal/job/%s/submit"; - private static final String stopJobsStr = "sal/job/stop"; - - private URI sal_uri; - private final HttpClient httpClient; - private String session_id = null; - - private final ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Construct a SalConnector instance. - * - * @param sal_uri the URI of the SAL server. Should only contain schema, - * host, port but no path component, since relative paths will be - * resolved against this URI. - * @param login the login name for SAL. - * @param password the login password for SAL. - */ - public SalConnector(URI sal_uri, String login, String password) { - this.sal_uri = sal_uri; - // This initialization code copied from - // https://gitlab.ow2.org/melodic/melodic-integration/-/blob/morphemic-rc4.0/connectors/proactive_client/src/main/java/cloud/morphemic/connectors/ProactiveClientConnectorService.java - objectMapper.configOverride(List.class) - .setSetterInfo(JsonSetter.Value.forValueNulls(Nulls.AS_EMPTY)) - .setSetterInfo(JsonSetter.Value.forContentNulls(Nulls.AS_EMPTY)); - connect(login, password); - httpClient = HttpClient.create() - .baseUrl(sal_uri.toString()) - .headers(headers -> headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)) - .responseTimeout(Duration.of(80, ChronoUnit.SECONDS)) - .wiretap("reactor.netty.http.client.HttpClient", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL, StandardCharsets.UTF_8); - if (isConnected()) { - httpClient.headers(headers -> headers.add("sessionid", session_id)); - } - httpClient.warmup().block(); - } - - /** - * Check if we are connected to a SAL endpoint, i.e., we successfully - * obtained a session id. - * - * @return true if we are connected, false if not - */ - public boolean isConnected() { - return session_id != null; - } - - /** - * Establish a connection with the SAL server. - * - * This method needs to be called before any other method, since it - * obtains the session id. - * - * @param sal_username the user name to log in to SAL - * @param sal_password the password to log in to SAL - * @return true if the connection was successful, false if not - */ - private boolean connect(String sal_username, String sal_password) { - URI endpoint_uri = sal_uri.resolve(connectStr); - log.trace("Connecting to SAL as a service at uri {}", endpoint_uri); + private static final ObjectMapper mapper = new ObjectMapper(); + public static List findNodeCandidates(List requirements, String appID) { + Map msg = new HashMap<>(); + Map metadata = new HashMap<>(); + metadata.put("user", "admin"); + msg.put("metaData", metadata); try { - this.session_id = HttpClient.create() - .headers(headers -> headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED)) - .post() - .uri(endpoint_uri) - .sendForm((req, form) -> form - .attr("username", sal_username) - .attr("password", sal_password)) - .responseContent() - .aggregate() - .asString() - .retry(20) - .block(); - } catch (Exception e) { - log.error("Error while connecting to SAL", e); - return false; - } - log.debug("Connected to SAL, sessionid {}...", session_id.substring(0, 10)); - return true; - } - - /** - * Get all cloud providers. See - * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/2-cloud-endpoints.md#22--getallclouds-endpoint - */ - public List getAllClouds() { - return httpClient.get() - .uri(sal_uri.resolve(getAllCloudsStr)) - .responseSingle((resp, bytes) -> { - if (!resp.status().equals(HttpResponseStatus.OK)) { - return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body))); - } else { - return bytes.asString().mapNotNull(s -> { - try { - return objectMapper.readValue(s, PACloud[].class); - } catch (IOException e) { - log.error(e.getMessage(), e);; - return null; - } - }); - } - }) - .doOnError(Throwable::printStackTrace) - .blockOptional() - .map(Arrays::asList) - .orElseGet(Collections::emptyList); - } - - /** - * Get node candidates. See - * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/7-node-endpoints.md#71--findnodecandidates-endpoint - */ - public List findNodeCandidates(List requirements) { - return httpClient.post() - .uri(sal_uri.resolve(findNodeCandidatesStr)) - .send(bodyMonoPublisher(requirements)) - .responseSingle((resp, bytes) -> { - if (!resp.status().equals(HttpResponseStatus.OK)) { - return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body))); - } else { - return bytes.asString().mapNotNull(s -> { - try { - log.trace("Received message: {}", s); - return objectMapper.readValue(s, NodeCandidate[].class); - } catch (IOException e) { - log.error(e.getMessage(), e); - return null; - } - }); - } - }) - .doOnError(Throwable::printStackTrace) - .blockOptional() - .map(Arrays::asList) - .orElseGet(Collections::emptyList); - } - - /** - * Create job. See - * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#51--createjob-endpoint - */ - public Boolean createJob(JobDefinition job) { - return httpClient.post() - .uri(sal_uri.resolve(createJobStr)) - .send(bodyMonoPublisher(job)) - .responseSingle((resp, bytes) -> { - if (!resp.status().equals(HttpResponseStatus.OK)) { - return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body))); - } else { - return bytes.asString().map(Boolean::parseBoolean); - } - }) - .doOnError(Throwable::printStackTrace) - .block(); - } - -/** - * Get list of jobs. See - * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#52--getjobs-endpoint - */ - public List fetchJobs() { - return httpClient.get() - .uri(sal_uri.resolve(getJobsStr)) - .responseSingle((resp, bytes) -> { - if (!resp.status().equals(HttpResponseStatus.OK)) { - return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body))); - } else { - return bytes.asString().mapNotNull(s -> { - try { - return objectMapper.readValue(s, Job[].class); - } catch (IOException e) { - log.error(e.getMessage(), e); - return null; - } - }); - } - }) - .doOnError(Throwable::printStackTrace) - .blockOptional() - .map(Arrays::asList) - .orElseGet(Collections::emptyList); - } - - - /** - * Stop SAL jobs. See - * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#54--stopjobs-endpoint - */ - public Long stopJobs(List jobIds) { - return httpClient.put() - .uri(sal_uri.resolve(stopJobsStr)) - .send(bodyMonoPublisher(jobIds)) - .responseSingle((resp, bytes) -> { - if (!resp.status().equals(HttpResponseStatus.OK)) { - return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body))); - } else { - return bytes.asString().map(Long::parseLong); - } - }) - .doOnError(Throwable::printStackTrace) - .block(); - } - - /** - * documentation - */ - public Boolean addNodes(List nodes, String jobId) { - return httpClient.post() - .uri(sal_uri.resolve(addNodesFormatStr.formatted(jobId))) - .send(bodyMonoPublisher(nodes)) - .responseSingle((resp, bytes) -> { - if (!resp.status().equals(HttpResponseStatus.OK)) { - return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body))); - } else { - // NOTE: was Boolean::new in Morphemic - return bytes.asString().map(Boolean::parseBoolean); - } - }) - .doOnError(Throwable::printStackTrace) - .block(); - } - - /** - * Submit job. See - * https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#55--submitjob-endpoint - */ - public String submitJob(String jobId) { - return httpClient.post() - .uri(sal_uri.resolve(submitJobFormatStr.formatted(jobId))) - .responseSingle((resp, bytes) -> { - if (!resp.status().equals(HttpResponseStatus.OK)) { - return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body))); - } else { - // Note: Morphemic parsed this as a long, but we don't, - // since the end point specifies that it returns the - // submitted job id or -1 - return bytes.asString(); - } - }) - .doOnError(Throwable::printStackTrace) - .block(); - } - - private Mono bodyMonoPublisher(Object body) { - // if ((body instanceof JSONArray) || (body instanceof JSONObject)) { - // return ByteBufMono.fromString(Mono.just(body.toString())); - // } - String json = null; - try { - json = objectMapper.writeValueAsString(body); - } catch (JsonProcessingException e) { - log.error(e.getMessage(), e);; - } - log.trace("Sending body json: {}", json); - return ByteBufMono.fromString(Mono.just(json)); + msg.put("body", mapper.writeValueAsString(requirements)); + } catch (JsonProcessingException e) { + log.error("Could not convert requirements list to JSON string", e); + return null; + } + Map response = ExnConnector.findNodeCandidates.sendSync(msg, appID, null, false); + String body = response.get("body").toString(); // body is a string already + try { + return Arrays.asList(mapper.readValue(body, NodeCandidate[].class)); + } catch (JsonProcessingException e) { + log.error("Error receiving findNodeCandidates result", e); + return null; + } } } diff --git a/optimiser-controller/src/test/java/eu/nebulouscloud/optimiser/controller/NebulousAppTests.java b/optimiser-controller/src/test/java/eu/nebulouscloud/optimiser/controller/NebulousAppTests.java index 57bff13..94eb239 100644 --- a/optimiser-controller/src/test/java/eu/nebulouscloud/optimiser/controller/NebulousAppTests.java +++ b/optimiser-controller/src/test/java/eu/nebulouscloud/optimiser/controller/NebulousAppTests.java @@ -85,7 +85,7 @@ public class NebulousAppTests { String kubevela_str = Files.readString(getResourcePath("vela-deployment-v2.yml"), StandardCharsets.UTF_8); JsonNode kubevela = yaml_mapper.readTree(kubevela_str); - Map> requirements = NebulousAppDeployer.getSalRequirementsFromKubevela(kubevela); + Map> requirements = NebulousAppDeployer.getWorkerRequirementsFromKubevela(kubevela); // We could compare the requirements with what is contained in // KubeVela, or compare keys with component names, but this would // essentially duplicate the method code--so we just make sure the @@ -105,7 +105,7 @@ public class NebulousAppTests { ObjectNode replacements = solutions.withObject("VariableValues"); ObjectNode kubevela1 = app.rewriteKubevelaWithSolution(replacements); - Map> requirements = NebulousAppDeployer.getSalRequirementsFromKubevela(kubevela1); + Map> requirements = NebulousAppDeployer.getWorkerRequirementsFromKubevela(kubevela1); // We could compare the requirements with what is contained in // KubeVela, or compare keys with component names, but this would // essentially duplicate the method code--so we just make sure the