diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/AMPLGenerator.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/AMPLGenerator.java index 1420699..8edf851 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/AMPLGenerator.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/AMPLGenerator.java @@ -115,7 +115,33 @@ public class AMPLGenerator { } private static void generateCostParameterSection(NebulousApp app, PrintWriter out) { - out.println("# TBD: cost parameters - for all components! and use of node-candidates tensor"); + out.println("# Cost parameters - for all components, and use of node-candidates tensor"); + ArrayNode indicators = app.getRelevantPerformanceIndicators().withArray("PerformanceIndicators"); + if (indicators.size() == 0) return; + for (JsonNode performanceIndicator : indicators) { + int nVariables = performanceIndicator.withArray("variables").size(); + String name = performanceIndicator.at("/coefficientsName").textValue(); + out.format("param %s{1..%s};%n", name, nVariables + 1); + } + for (JsonNode performanceIndicator : indicators) { + int iVariable = 1; + String var_name = performanceIndicator.at("/name").textValue(); + String coeff_name = performanceIndicator.at("/coefficientsName").textValue(); + out.format("var %s = %s[1]", var_name, coeff_name); + for (JsonNode var : performanceIndicator.withArray("/variables")) { + iVariable++; + out.format(" + %s[%s] * %s", coeff_name, iVariable, var.textValue()); + } + out.println(";"); + } + out.println(); + out.println("minimize costfunction:"); + String separator = " "; + for (JsonNode performanceIndicator : indicators) { + out.print(separator); separator = " + "; + out.print(performanceIndicator.at("/name").textValue()); + } + out.println(";"); out.println(); } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java index 74035ef..ea83602 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java @@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j; import static net.logstash.logback.argument.StructuredArguments.keyValue; import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.ow2.proactive.sal.model.NodeCandidate; import org.ow2.proactive.sal.model.Requirement; @@ -25,7 +26,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; @@ -53,12 +53,22 @@ public class ExnConnector { /** The topic where we listen for app creation messages. */ public static final String app_creation_channel = "eu.nebulouscloud.ui.dsl.generic"; + /** The topic with an application's relevant performance indicators. */ + public static final String performance_indicators_channel = + "eu.nebulouscloud.optimiser.utilityevaluator.performanceindicators"; /** The topic with incoming solver solution messages. See * https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/2-solvers */ public static final String solver_solution_channel = "eu.nebulouscloud.optimiser.solver.solution"; /** The topic where we send AMPL messages */ // 1 object with key: filename, value: AMPL file (serialized) - public static final String ampl_message_channel = "eu.nebulouscloud.optimiser.solver.model"; + public static final String ampl_message_channel = "eu.nebulouscloud.optimiser.controller.model"; + + /** The metrics to send to EMS and Solver */ + public static final String metric_list_channel = "eu.nebulouscloud.optimiser.controller.metric_list"; + /** The status channel for the solvers. We send out an app's AMPL file on + * the channel named by {@link #ampl_message_channel} when getting the + * "started" message from a solver. */ + public static final String solver_status_channel = "eu.nebulouscloud.solver.state"; /** * The Message producer for sending AMPL files, shared between all @@ -69,11 +79,14 @@ public class ExnConnector { @Getter private final Publisher amplMessagePublisher; + /** The publisher for sending the metric list to EMS and Solver during app + * creation. */ + @Getter + private final Publisher metricListPublisher; + // ---------------------------------------- // Communication with SAL - /** The createJob endpoint. */ - public final SyncedPublisher createJob; /** The findNodeCandidates endpoint. Should not be used during normal * operation--ask the broker instead. */ public final SyncedPublisher findSalNodeCandidates; @@ -111,7 +124,7 @@ public class ExnConnector { */ public ExnConnector(String host, int port, String name, String password, ConnectorHandler callback) { amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true); - createJob = new SyncedPublisher("createJob", "eu.nebulouscloud.exn.sal.job.post", true, true); + metricListPublisher = new Publisher("controller_metric_list", metric_list_channel, true, true); findSalNodeCandidates = new SyncedPublisher("findSalNodeCandidates", "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true); findBrokerNodeCandidates = new SyncedPublisher("findBrokerNodeCandidates", "eu.nebulouscloud.cfsb.get_node_candidates", true, true); defineCluster = new SyncedPublisher("defineCluster", "eu.nebulouscloud.exn.sal.cluster.define", true, true); @@ -125,7 +138,11 @@ public class ExnConnector { conn = new Connector("optimiser_controller", callback, - List.of(amplMessagePublisher, + List.of( + // asynchronous topics for sending out controller status + amplMessagePublisher, + metricListPublisher, + // synchronous communication with SAL via exn-middleware findSalNodeCandidates, findBrokerNodeCandidates, defineCluster, @@ -137,8 +154,12 @@ public class ExnConnector { scaleIn, deleteCluster), List.of( + new Consumer("solver_status", solver_status_channel, + new SolverStatusMessageHandler(), true, true), new Consumer("ui_app_messages", app_creation_channel, new AppCreationMessageHandler(), true, true), + new Consumer("performance_indicator_messages", performance_indicators_channel, + new PerformanceIndicatorMessageHandler(), true, true), new Consumer("solver_solution_messages", solver_solution_channel, new SolverSolutionMessageHandler(), true, true)), true, @@ -182,8 +203,9 @@ public class ExnConnector { * others, the KubeVela YAML definition and mapping from KubeVela * locations to AMPL variables. * - * When receiving a message, the handler tries to instantiate a - * `NebulousApp` object. + *

When receiving a message, the handler instantiates a `NebulousApp` + * object. If we already received the performance indicators from the + * utility evaluator, perform initial deployment; otherwise, wait. */ // Note that there is another, earlier app creation message sent via the // channel `eu.nebulouscloud.ui.application.new`, but its format is not @@ -191,21 +213,106 @@ public class ExnConnector { public class AppCreationMessageHandler extends Handler { @Override public void onMessage(String key, String address, Map body, Message message, Context context) { + NebulousApp app = null; try { Object app_id = message.property("application"); // might be null if (app_id == null) app_id = message.subject(); // if app_id is still null, the filename will look a bit funky but it's not a problem log.info("App creation message received", keyValue("appId", app_id)); JsonNode appMessage = mapper.valueToTree(body); - Main.logFile("app-message-" + app_id + ".json", appMessage); - NebulousApp app = NebulousApp.newFromAppMessage( + Main.logFile("app-message-" + app_id + ".json", appMessage.toPrettyString()); + app = NebulousApp.newFromAppMessage( // TODO create a new ExnConnector here? mapper.valueToTree(body), ExnConnector.this); NebulousApps.add(app); - app.sendAMPL(); - app.deployUnmodifiedApplication(); + String appIdFromMessage = app.getUUID(); + if (NebulousApps.relevantPerformanceIndicators.containsKey(appIdFromMessage)) { + // If the performance indicators haven't arrived yet, this + // will happen in PerformanceIndicatorMessageHandler below. + app.setStateReady(NebulousApps.relevantPerformanceIndicators.get(appIdFromMessage)); + app.deployUnmodifiedApplication(); + // Not strictly necessary to remove the performance + // indicators, but let's not leave unneeded data around + NebulousApps.relevantPerformanceIndicators.remove(appIdFromMessage); + } } catch (Exception e) { log.error("Error while receiving app creation message", e); + if (app != null) app.setStateFailed(); + } + } + } + + /** + * A handler that receives the performance indicators that the utility + * evaluator sends. If the application object already exists (usually the + * case), start initial deployment, otherwise store the performance + * indicators so the initial app creation message can pick them up. + */ + public class PerformanceIndicatorMessageHandler extends Handler { + @Override + public void onMessage(String key, String address, Map body, Message message, Context context) { + Object appIdObject = null; + try { + appIdObject = message.property("application"); + if (appIdObject == null) appIdObject = message.subject(); + } catch (ClientException e) { + log.error("Received performance indicator message without application property, aborting"); + return; + } + String appId = null; + if (appIdObject == null) { + log.error("Received performance indicator message without application property, aborting"); + } else { + appId = appIdObject.toString(); // should be a string already + log.info("Received performance indicator message", keyValue("appId", appId)); + } + JsonNode appMessage = mapper.valueToTree(body); + Main.logFile("performance-indicators-" + appIdObject + ".json", appMessage.toPrettyString()); + NebulousApp app = NebulousApps.get(appId); + if (app == null) { + NebulousApps.relevantPerformanceIndicators.put(appId, appMessage); + log.info("Received performance indicator message for unknown app object, storing", + keyValue("appId", appId)); + } else { + app.setStateReady(appMessage); + app.deployUnmodifiedApplication(); + } + } + } + + /** + * A handler that detects when the solver has started for a given + * application. + */ + public class SolverStatusMessageHandler extends Handler { + @Override + public void onMessage(String key, String address, Map body, Message message, Context context) { + Object appIdObject = null; + String appId = null; + try { + appIdObject = message.property("application"); + } catch (ClientException e) { + log.error("Received solver ready message {} without application property, aborting", body); + return; + } + if (appIdObject == null) { + log.error("Received solver ready message {} without application property, aborting", body); + return; + } else { + appId = appIdObject.toString(); // should be a string already + log.info("Received solver status message {}", body, keyValue("appId", appId)); + } + + JsonNode appMessage = mapper.valueToTree(body); + String status = appMessage.at("/state").textValue(); + if (status == null || !status.equals("started")) return; + + NebulousApp app = NebulousApps.get(appId); + if (app == null) { + log.info("Received solver status message for unknown app object, this should not happen", + keyValue("appId", appId)); + } else { + app.sendAMPL(); } } } @@ -227,7 +334,7 @@ public class ExnConnector { log.warn("Received solver solution without 'application' message property, discarding it"); return; } - Main.logFile("solver-solution-" + app_id + ".json", json_body); + Main.logFile("solver-solution-" + app_id + ".json", json_body.toPrettyString()); NebulousApp app = NebulousApps.get(app_id); if (app == null) { log.warn("Received solver solutions for non-existant application, discarding.", @@ -271,13 +378,13 @@ public class ExnConnector { String salRawResponse = response.at("/body").asText(); // it's already a string, asText() is for the type system JsonNode metadata = response.at("/metaData"); JsonNode salResponse = mapper.missingNode(); // the data coming from SAL - try { - salResponse = mapper.readTree(salRawResponse); - } catch (JsonProcessingException e) { + try { + salResponse = mapper.readTree(salRawResponse); + } catch (JsonProcessingException e) { log.error("Could not read message body as JSON: body = '{}'", salRawResponse, keyValue("appId", appID), keyValue("caller", caller), e); return mapper.missingNode(); - } + } if (!metadata.at("/status").asText().startsWith("2")) { // we only accept 200, 202, numbers of that nature log.error("exn-middleware-sal request failed with error code '{}' and message '{}'", @@ -391,14 +498,14 @@ public class ExnConnector { if (cpu1 != cpu2) return cpu1 - cpu2; else return Math.toIntExact(ram1 - ram2); }); - return candidates; + return candidates; } /** * Define a cluster with the given name and node list. - * + * *

The cluster is passed in a JSON of the following shape: - * + * *

{@code
      * {
      *     "name":"485d7-1",
@@ -420,13 +527,13 @@ public class ExnConnector {
      *     }
      * }
      * }
- * + * *

Each value for {@code nodeName} has to be globally unique, must * start with a letter and contain numbers, letters and hyphens only. - * + * *

The values for {@code nodeCandidateId} and {@code cloudId} come from * the return value of a call to {@link #findNodeCandidates()}. - * + * * @param appID The application's id, used only for logging. * @param clusterName The cluster name, used only for logging. * @param cluster A JSON object, as detailed above. @@ -434,7 +541,7 @@ public class ExnConnector { */ public boolean defineCluster(String appID, String clusterName, ObjectNode cluster) { // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed - Main.logFile("define-cluster-" + appID + ".json", cluster); + Main.logFile("define-cluster-" + appID + ".json", cluster.toPrettyString()); Map msg; try { msg = Map.of("metaData", Map.of("user", "admin"), @@ -471,14 +578,14 @@ public class ExnConnector { */ public boolean labelNodes(String appID, String clusterID, JsonNode labels) { Map msg; - try { - msg = Map.of("metaData", Map.of("user", "admin", "clusterName", clusterID), - "body", mapper.writeValueAsString(labels)); - } catch (JsonProcessingException e) { + try { + msg = Map.of("metaData", Map.of("user", "admin", "clusterName", clusterID), + "body", mapper.writeValueAsString(labels)); + } catch (JsonProcessingException e) { log.error("Could not convert JSON to string (this should never happen)", keyValue("appId", appID), e); return false; - } + } Map response = labelNodes.sendSync(msg, appID, null, false); JsonNode payload = extractPayloadFromExnResponse(response, appID, "labelNodes"); return payload.isMissingNode() ? false : true; @@ -520,7 +627,7 @@ public class ExnConnector { .put("appName", appName) .put("action", "apply") .put("flags", ""); - Main.logFile("deploy-application-" + appID + ".json", body); + Main.logFile("deploy-application-" + appID + ".json", body.toPrettyString()); Map msg; try { String bodyStr = mapper.writeValueAsString(body); diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java index 548f148..0b481ff 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/LocalExecution.java @@ -66,6 +66,7 @@ public class LocalExecution implements Callable { } if (deploy) { log.debug("Deploying application", connector.getAmplMessagePublisher()); + app.setStateReady(null); // TODO: insert second file here app.deployUnmodifiedApplication(); } } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java index fad37b7..8ebac44 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java @@ -12,7 +12,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import eu.nebulouscloud.exn.core.Publisher; import lombok.Getter; -import lombok.Setter; import lombok.Synchronized; import lombok.extern.slf4j.Slf4j; import static net.logstash.logback.argument.StructuredArguments.keyValue; @@ -125,6 +124,9 @@ public class NebulousApp { @Getter private Map compositeMetrics = new HashMap<>(); /** The app's performance indicators, a map from key to the defining JSON node. */ @Getter private Map performanceIndicators = new HashMap<>(); + /** The app's "relevant" performance indicators, as calculated by the + * utility evaluator. Initialized to empty object for testing purposes. */ + @Getter private JsonNode relevantPerformanceIndicators = jsonMapper.createObjectNode(); /** The app's utility functions; the AMPL solver will optimize for one of these. */ @Getter private Map utilityFunctions = new HashMap<>(); /** @@ -215,7 +217,7 @@ public class NebulousApp { public NebulousApp(JsonNode app_message, ObjectNode kubevela, ExnConnector exnConnector) { this.UUID = app_message.at(uuid_path).textValue(); this.name = app_message.at(name_path).textValue(); - this.state = State.READY; + this.state = State.NEW; this.clusterName = NebulousApps.calculateUniqueClusterName(this.UUID); this.originalAppMessage = app_message; this.originalKubevela = kubevela; @@ -319,6 +321,21 @@ public class NebulousApp { } } + /** + * Set the state from NEW to READY, adding the list of relevant + * performance indicators. Return false if state was not READY. + */ + @Synchronized + public boolean setStateReady(JsonNode relevantPerformanceIndicators) { + if (state != State.NEW) { + return false; + } else { + state = State.READY; + this.relevantPerformanceIndicators = relevantPerformanceIndicators; + return true; + } + } + /** * Set the state from READY to DEPLOYING, and increment the generation. * @@ -471,10 +488,16 @@ public class NebulousApp { *

TODO: also send performance indicators to solver here */ public void sendAMPL() { - String ampl = AMPLGenerator.generateAMPL(this); + String ampl_model = AMPLGenerator.generateAMPL(this); + String ampl_data = relevantPerformanceIndicators.at("/initialDataFile").textValue(); ObjectNode msg = jsonMapper.createObjectNode(); - msg.put("FileName", getUUID() + ".ampl"); - msg.put("FileContent", ampl); + + msg.put("ModelFileName", getUUID() + ".ampl"); + msg.put("ModelFileContent", ampl_model); + if (ampl_data != null && ampl_data.length() > 0) { + msg.put("DataFileName", getUUID() + ".dat"); + msg.put("DataFileContent", ampl_data); + } msg.put("ObjectiveFunction", getObjectiveFunction()); ObjectNode constants = msg.withObject("Constants"); // Define initial values for constant utility functions: @@ -499,10 +522,28 @@ public class NebulousApp { constant.put("Variable", variableName); constant.set("Value", value); } - log.info("Sending AMPL file to solver", keyValue("amplMessage", msg), keyValue("appId", UUID)); + log.info("Sending AMPL files to solver", keyValue("amplMessage", msg), keyValue("appId", UUID)); exnConnector.getAmplMessagePublisher().send(jsonMapper.convertValue(msg, Map.class), getUUID(), true); - Main.logFile("to-solver-" + getUUID() + ".json", msg.toString()); - Main.logFile("to-solver-" + getUUID() + ".ampl", ampl); + Main.logFile("to-solver-" + getUUID() + ".json", msg.toPrettyString()); + Main.logFile("to-solver-" + getUUID() + ".ampl", ampl_model); + } + + /** + * Send the metric list for the given app. This is done two times: once + * before app cluster creation to initialize EMS, once after cluster app + * creation to initialize the solver. + * + * @param app The application under deployment. + */ + public void sendMetricList() { + Publisher metricsChannel = exnConnector.getMetricListPublisher(); + ObjectNode msg = jsonMapper.createObjectNode(); + ArrayNode metricNames = msg.withArray("/metrics"); + getRawMetrics().forEach((k, v) -> metricNames.add(k)); + getCompositeMetrics().forEach((k, v) -> metricNames.add(k)); + log.info("Sending metric list", keyValue("appId", UUID)); + metricsChannel.send(jsonMapper.convertValue(msg, Map.class), getUUID(), true); + Main.logFile("metric-names-" + getUUID() + ".json", msg.toPrettyString()); } /** diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java index 8a3c394..3b797e7 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousAppDeployer.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import eu.nebulouscloud.exn.core.Publisher; import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer; import org.ow2.proactive.sal.model.AttributeRequirement; import org.ow2.proactive.sal.model.NodeCandidate; @@ -137,6 +138,9 @@ public class NebulousAppDeployer { * {@code false} otherwise. */ private static boolean isClusterDeploymentFinished(JsonNode clusterStatus) { + if (clusterStatus == null || !clusterStatus.isObject()) + // Catch various failure states, e.g., SAL spuriously returning null + return false; return clusterStatus.withArray("/nodes") .findParents("state") .stream() @@ -213,12 +217,15 @@ public class NebulousAppDeployer { // controller. // - Select node candidates, making sure to only select edge nodes // once. + // - (Before deploying the cluster) send metric name list. // - Create a SAL cluster. // - Deploy the SAL cluster. // - Add node affinity traits to the KubeVela file. // - Deploy the SAL application. // - Store cluster state (deployed KubeVela file, etc.) in // NebulousApp object. + // - Asynchronously, triggered via solver readiness message: wait for + // solver to be ready, send AMPL and re-send metric name list. // ------------------------------------------------------------ // Extract node requirements @@ -355,8 +362,9 @@ public class NebulousAppDeployer { return; } - // TODO: send performance indicators (for monitoring system, which - // needs it before cluster creation) + // ------------------------------------------------------------ + // Send metrics to EMS + app.sendMetricList(); // ------------------------------------------------------------ // Create cluster diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java index 1c78416..3bb82c9 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApps.java @@ -7,15 +7,24 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.fasterxml.jackson.databind.JsonNode; + /** - * Class that manages a collection of NebulousApp instances. + * Class that manages a collection of NebulousApp instances. Also used to + * temporarily store the relevant performance indicators which are calculated + * by the utility evaluator from the same incoming app creation message. */ @Slf4j public class NebulousApps { /** The global app registry. */ - // (Putting this here until we find a better place.) - private static final Map apps = new ConcurrentHashMap(); + private static final Map apps = new ConcurrentHashMap<>(); + + /** + * A place to store relevant performance indicator messages if the app + * object is not yet available. + */ + public static final Map relevantPerformanceIndicators = new ConcurrentHashMap<>(); /** * Add a new application object to the registry.