Handle cost parameters, solver initialization

- Wait with app deployment until utility evaluator has sent us the cost
  parameter list.

- Emit cost parameters in AMPL model

- Send both AMPL model and data to the solver once it's started

- Pretty-print json files

Change-Id: I99b67a10a60883bb411ad486bb8ef92fec175350
This commit is contained in:
Rudi Schlatte 2024-04-15 17:57:16 +02:00
parent ab6375304d
commit c6fa96a51f
6 changed files with 236 additions and 44 deletions

View File

@ -115,7 +115,33 @@ public class AMPLGenerator {
}
private static void generateCostParameterSection(NebulousApp app, PrintWriter out) {
out.println("# TBD: cost parameters - for all components! and use of node-candidates tensor");
out.println("# Cost parameters - for all components, and use of node-candidates tensor");
ArrayNode indicators = app.getRelevantPerformanceIndicators().withArray("PerformanceIndicators");
if (indicators.size() == 0) return;
for (JsonNode performanceIndicator : indicators) {
int nVariables = performanceIndicator.withArray("variables").size();
String name = performanceIndicator.at("/coefficientsName").textValue();
out.format("param %s{1..%s};%n", name, nVariables + 1);
}
for (JsonNode performanceIndicator : indicators) {
int iVariable = 1;
String var_name = performanceIndicator.at("/name").textValue();
String coeff_name = performanceIndicator.at("/coefficientsName").textValue();
out.format("var %s = %s[1]", var_name, coeff_name);
for (JsonNode var : performanceIndicator.withArray("/variables")) {
iVariable++;
out.format(" + %s[%s] * %s", coeff_name, iVariable, var.textValue());
}
out.println(";");
}
out.println();
out.println("minimize costfunction:");
String separator = " ";
for (JsonNode performanceIndicator : indicators) {
out.print(separator); separator = " + ";
out.print(performanceIndicator.at("/name").textValue());
}
out.println(";");
out.println();
}

View File

@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
import static net.logstash.logback.argument.StructuredArguments.keyValue;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.ow2.proactive.sal.model.NodeCandidate;
import org.ow2.proactive.sal.model.Requirement;
@ -25,7 +26,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
@ -53,12 +53,22 @@ public class ExnConnector {
/** The topic where we listen for app creation messages. */
public static final String app_creation_channel = "eu.nebulouscloud.ui.dsl.generic";
/** The topic with an application's relevant performance indicators. */
public static final String performance_indicators_channel =
"eu.nebulouscloud.optimiser.utilityevaluator.performanceindicators";
/** The topic with incoming solver solution messages. See
* https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/2-solvers */
public static final String solver_solution_channel = "eu.nebulouscloud.optimiser.solver.solution";
/** The topic where we send AMPL messages */
// 1 object with key: filename, value: AMPL file (serialized)
public static final String ampl_message_channel = "eu.nebulouscloud.optimiser.solver.model";
public static final String ampl_message_channel = "eu.nebulouscloud.optimiser.controller.model";
/** The metrics to send to EMS and Solver */
public static final String metric_list_channel = "eu.nebulouscloud.optimiser.controller.metric_list";
/** The status channel for the solvers. We send out an app's AMPL file on
* the channel named by {@link #ampl_message_channel} when getting the
* "started" message from a solver. */
public static final String solver_status_channel = "eu.nebulouscloud.solver.state";
/**
* The Message producer for sending AMPL files, shared between all
@ -69,11 +79,14 @@ public class ExnConnector {
@Getter
private final Publisher amplMessagePublisher;
/** The publisher for sending the metric list to EMS and Solver during app
* creation. */
@Getter
private final Publisher metricListPublisher;
// ----------------------------------------
// Communication with SAL
/** The createJob endpoint. */
public final SyncedPublisher createJob;
/** The findNodeCandidates endpoint. Should not be used during normal
* operation--ask the broker instead. */
public final SyncedPublisher findSalNodeCandidates;
@ -111,7 +124,7 @@ public class ExnConnector {
*/
public ExnConnector(String host, int port, String name, String password, ConnectorHandler callback) {
amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true);
createJob = new SyncedPublisher("createJob", "eu.nebulouscloud.exn.sal.job.post", true, true);
metricListPublisher = new Publisher("controller_metric_list", metric_list_channel, true, true);
findSalNodeCandidates = new SyncedPublisher("findSalNodeCandidates", "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true);
findBrokerNodeCandidates = new SyncedPublisher("findBrokerNodeCandidates", "eu.nebulouscloud.cfsb.get_node_candidates", true, true);
defineCluster = new SyncedPublisher("defineCluster", "eu.nebulouscloud.exn.sal.cluster.define", true, true);
@ -125,7 +138,11 @@ public class ExnConnector {
conn = new Connector("optimiser_controller",
callback,
List.of(amplMessagePublisher,
List.of(
// asynchronous topics for sending out controller status
amplMessagePublisher,
metricListPublisher,
// synchronous communication with SAL via exn-middleware
findSalNodeCandidates,
findBrokerNodeCandidates,
defineCluster,
@ -137,8 +154,12 @@ public class ExnConnector {
scaleIn,
deleteCluster),
List.of(
new Consumer("solver_status", solver_status_channel,
new SolverStatusMessageHandler(), true, true),
new Consumer("ui_app_messages", app_creation_channel,
new AppCreationMessageHandler(), true, true),
new Consumer("performance_indicator_messages", performance_indicators_channel,
new PerformanceIndicatorMessageHandler(), true, true),
new Consumer("solver_solution_messages", solver_solution_channel,
new SolverSolutionMessageHandler(), true, true)),
true,
@ -182,8 +203,9 @@ public class ExnConnector {
* others, the KubeVela YAML definition and mapping from KubeVela
* locations to AMPL variables.
*
* When receiving a message, the handler tries to instantiate a
* `NebulousApp` object.
* <p>When receiving a message, the handler instantiates a `NebulousApp`
* object. If we already received the performance indicators from the
* utility evaluator, perform initial deployment; otherwise, wait.
*/
// Note that there is another, earlier app creation message sent via the
// channel `eu.nebulouscloud.ui.application.new`, but its format is not
@ -191,21 +213,106 @@ public class ExnConnector {
public class AppCreationMessageHandler extends Handler {
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
NebulousApp app = null;
try {
Object app_id = message.property("application"); // might be null
if (app_id == null) app_id = message.subject();
// if app_id is still null, the filename will look a bit funky but it's not a problem
log.info("App creation message received", keyValue("appId", app_id));
JsonNode appMessage = mapper.valueToTree(body);
Main.logFile("app-message-" + app_id + ".json", appMessage);
NebulousApp app = NebulousApp.newFromAppMessage(
Main.logFile("app-message-" + app_id + ".json", appMessage.toPrettyString());
app = NebulousApp.newFromAppMessage(
// TODO create a new ExnConnector here?
mapper.valueToTree(body), ExnConnector.this);
NebulousApps.add(app);
app.sendAMPL();
app.deployUnmodifiedApplication();
String appIdFromMessage = app.getUUID();
if (NebulousApps.relevantPerformanceIndicators.containsKey(appIdFromMessage)) {
// If the performance indicators haven't arrived yet, this
// will happen in PerformanceIndicatorMessageHandler below.
app.setStateReady(NebulousApps.relevantPerformanceIndicators.get(appIdFromMessage));
app.deployUnmodifiedApplication();
// Not strictly necessary to remove the performance
// indicators, but let's not leave unneeded data around
NebulousApps.relevantPerformanceIndicators.remove(appIdFromMessage);
}
} catch (Exception e) {
log.error("Error while receiving app creation message", e);
if (app != null) app.setStateFailed();
}
}
}
/**
* A handler that receives the performance indicators that the utility
* evaluator sends. If the application object already exists (usually the
* case), start initial deployment, otherwise store the performance
* indicators so the initial app creation message can pick them up.
*/
public class PerformanceIndicatorMessageHandler extends Handler {
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
Object appIdObject = null;
try {
appIdObject = message.property("application");
if (appIdObject == null) appIdObject = message.subject();
} catch (ClientException e) {
log.error("Received performance indicator message without application property, aborting");
return;
}
String appId = null;
if (appIdObject == null) {
log.error("Received performance indicator message without application property, aborting");
} else {
appId = appIdObject.toString(); // should be a string already
log.info("Received performance indicator message", keyValue("appId", appId));
}
JsonNode appMessage = mapper.valueToTree(body);
Main.logFile("performance-indicators-" + appIdObject + ".json", appMessage.toPrettyString());
NebulousApp app = NebulousApps.get(appId);
if (app == null) {
NebulousApps.relevantPerformanceIndicators.put(appId, appMessage);
log.info("Received performance indicator message for unknown app object, storing",
keyValue("appId", appId));
} else {
app.setStateReady(appMessage);
app.deployUnmodifiedApplication();
}
}
}
/**
* A handler that detects when the solver has started for a given
* application.
*/
public class SolverStatusMessageHandler extends Handler {
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
Object appIdObject = null;
String appId = null;
try {
appIdObject = message.property("application");
} catch (ClientException e) {
log.error("Received solver ready message {} without application property, aborting", body);
return;
}
if (appIdObject == null) {
log.error("Received solver ready message {} without application property, aborting", body);
return;
} else {
appId = appIdObject.toString(); // should be a string already
log.info("Received solver status message {}", body, keyValue("appId", appId));
}
JsonNode appMessage = mapper.valueToTree(body);
String status = appMessage.at("/state").textValue();
if (status == null || !status.equals("started")) return;
NebulousApp app = NebulousApps.get(appId);
if (app == null) {
log.info("Received solver status message for unknown app object, this should not happen",
keyValue("appId", appId));
} else {
app.sendAMPL();
}
}
}
@ -227,7 +334,7 @@ public class ExnConnector {
log.warn("Received solver solution without 'application' message property, discarding it");
return;
}
Main.logFile("solver-solution-" + app_id + ".json", json_body);
Main.logFile("solver-solution-" + app_id + ".json", json_body.toPrettyString());
NebulousApp app = NebulousApps.get(app_id);
if (app == null) {
log.warn("Received solver solutions for non-existant application, discarding.",
@ -271,13 +378,13 @@ public class ExnConnector {
String salRawResponse = response.at("/body").asText(); // it's already a string, asText() is for the type system
JsonNode metadata = response.at("/metaData");
JsonNode salResponse = mapper.missingNode(); // the data coming from SAL
try {
salResponse = mapper.readTree(salRawResponse);
} catch (JsonProcessingException e) {
try {
salResponse = mapper.readTree(salRawResponse);
} catch (JsonProcessingException e) {
log.error("Could not read message body as JSON: body = '{}'", salRawResponse,
keyValue("appId", appID), keyValue("caller", caller), e);
return mapper.missingNode();
}
}
if (!metadata.at("/status").asText().startsWith("2")) {
// we only accept 200, 202, numbers of that nature
log.error("exn-middleware-sal request failed with error code '{}' and message '{}'",
@ -391,14 +498,14 @@ public class ExnConnector {
if (cpu1 != cpu2) return cpu1 - cpu2;
else return Math.toIntExact(ram1 - ram2);
});
return candidates;
return candidates;
}
/**
* Define a cluster with the given name and node list.
*
*
* <p>The cluster is passed in a JSON of the following shape:
*
*
* <pre>{@code
* {
* "name":"485d7-1",
@ -420,13 +527,13 @@ public class ExnConnector {
* }
* }
* }</pre>
*
*
* <p>Each value for {@code nodeName} has to be globally unique, must
* start with a letter and contain numbers, letters and hyphens only.
*
*
* <p>The values for {@code nodeCandidateId} and {@code cloudId} come from
* the return value of a call to {@link #findNodeCandidates()}.
*
*
* @param appID The application's id, used only for logging.
* @param clusterName The cluster name, used only for logging.
* @param cluster A JSON object, as detailed above.
@ -434,7 +541,7 @@ public class ExnConnector {
*/
public boolean defineCluster(String appID, String clusterName, ObjectNode cluster) {
// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed
Main.logFile("define-cluster-" + appID + ".json", cluster);
Main.logFile("define-cluster-" + appID + ".json", cluster.toPrettyString());
Map<String, Object> msg;
try {
msg = Map.of("metaData", Map.of("user", "admin"),
@ -471,14 +578,14 @@ public class ExnConnector {
*/
public boolean labelNodes(String appID, String clusterID, JsonNode labels) {
Map<String, Object> msg;
try {
msg = Map.of("metaData", Map.of("user", "admin", "clusterName", clusterID),
"body", mapper.writeValueAsString(labels));
} catch (JsonProcessingException e) {
try {
msg = Map.of("metaData", Map.of("user", "admin", "clusterName", clusterID),
"body", mapper.writeValueAsString(labels));
} catch (JsonProcessingException e) {
log.error("Could not convert JSON to string (this should never happen)",
keyValue("appId", appID), e);
return false;
}
}
Map<String, Object> response = labelNodes.sendSync(msg, appID, null, false);
JsonNode payload = extractPayloadFromExnResponse(response, appID, "labelNodes");
return payload.isMissingNode() ? false : true;
@ -520,7 +627,7 @@ public class ExnConnector {
.put("appName", appName)
.put("action", "apply")
.put("flags", "");
Main.logFile("deploy-application-" + appID + ".json", body);
Main.logFile("deploy-application-" + appID + ".json", body.toPrettyString());
Map<String, Object> msg;
try {
String bodyStr = mapper.writeValueAsString(body);

View File

@ -66,6 +66,7 @@ public class LocalExecution implements Callable<Integer> {
}
if (deploy) {
log.debug("Deploying application", connector.getAmplMessagePublisher());
app.setStateReady(null); // TODO: insert second file here
app.deployUnmodifiedApplication();
}
}

View File

@ -12,7 +12,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import eu.nebulouscloud.exn.core.Publisher;
import lombok.Getter;
import lombok.Setter;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import static net.logstash.logback.argument.StructuredArguments.keyValue;
@ -125,6 +124,9 @@ public class NebulousApp {
@Getter private Map<String, JsonNode> compositeMetrics = new HashMap<>();
/** The app's performance indicators, a map from key to the defining JSON node. */
@Getter private Map<String, JsonNode> performanceIndicators = new HashMap<>();
/** The app's "relevant" performance indicators, as calculated by the
* utility evaluator. Initialized to empty object for testing purposes. */
@Getter private JsonNode relevantPerformanceIndicators = jsonMapper.createObjectNode();
/** The app's utility functions; the AMPL solver will optimize for one of these. */
@Getter private Map<String, JsonNode> utilityFunctions = new HashMap<>();
/**
@ -215,7 +217,7 @@ public class NebulousApp {
public NebulousApp(JsonNode app_message, ObjectNode kubevela, ExnConnector exnConnector) {
this.UUID = app_message.at(uuid_path).textValue();
this.name = app_message.at(name_path).textValue();
this.state = State.READY;
this.state = State.NEW;
this.clusterName = NebulousApps.calculateUniqueClusterName(this.UUID);
this.originalAppMessage = app_message;
this.originalKubevela = kubevela;
@ -319,6 +321,21 @@ public class NebulousApp {
}
}
/**
* Set the state from NEW to READY, adding the list of relevant
* performance indicators. Return false if state was not READY.
*/
@Synchronized
public boolean setStateReady(JsonNode relevantPerformanceIndicators) {
if (state != State.NEW) {
return false;
} else {
state = State.READY;
this.relevantPerformanceIndicators = relevantPerformanceIndicators;
return true;
}
}
/**
* Set the state from READY to DEPLOYING, and increment the generation.
*
@ -471,10 +488,16 @@ public class NebulousApp {
* <p> TODO: also send performance indicators to solver here
*/
public void sendAMPL() {
String ampl = AMPLGenerator.generateAMPL(this);
String ampl_model = AMPLGenerator.generateAMPL(this);
String ampl_data = relevantPerformanceIndicators.at("/initialDataFile").textValue();
ObjectNode msg = jsonMapper.createObjectNode();
msg.put("FileName", getUUID() + ".ampl");
msg.put("FileContent", ampl);
msg.put("ModelFileName", getUUID() + ".ampl");
msg.put("ModelFileContent", ampl_model);
if (ampl_data != null && ampl_data.length() > 0) {
msg.put("DataFileName", getUUID() + ".dat");
msg.put("DataFileContent", ampl_data);
}
msg.put("ObjectiveFunction", getObjectiveFunction());
ObjectNode constants = msg.withObject("Constants");
// Define initial values for constant utility functions:
@ -499,10 +522,28 @@ public class NebulousApp {
constant.put("Variable", variableName);
constant.set("Value", value);
}
log.info("Sending AMPL file to solver", keyValue("amplMessage", msg), keyValue("appId", UUID));
log.info("Sending AMPL files to solver", keyValue("amplMessage", msg), keyValue("appId", UUID));
exnConnector.getAmplMessagePublisher().send(jsonMapper.convertValue(msg, Map.class), getUUID(), true);
Main.logFile("to-solver-" + getUUID() + ".json", msg.toString());
Main.logFile("to-solver-" + getUUID() + ".ampl", ampl);
Main.logFile("to-solver-" + getUUID() + ".json", msg.toPrettyString());
Main.logFile("to-solver-" + getUUID() + ".ampl", ampl_model);
}
/**
* Send the metric list for the given app. This is done two times: once
* before app cluster creation to initialize EMS, once after cluster app
* creation to initialize the solver.
*
* @param app The application under deployment.
*/
public void sendMetricList() {
Publisher metricsChannel = exnConnector.getMetricListPublisher();
ObjectNode msg = jsonMapper.createObjectNode();
ArrayNode metricNames = msg.withArray("/metrics");
getRawMetrics().forEach((k, v) -> metricNames.add(k));
getCompositeMetrics().forEach((k, v) -> metricNames.add(k));
log.info("Sending metric list", keyValue("appId", UUID));
metricsChannel.send(jsonMapper.convertValue(msg, Map.class), getUUID(), true);
Main.logFile("metric-names-" + getUUID() + ".json", msg.toPrettyString());
}
/**

View File

@ -8,6 +8,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer;
import org.ow2.proactive.sal.model.AttributeRequirement;
import org.ow2.proactive.sal.model.NodeCandidate;
@ -137,6 +138,9 @@ public class NebulousAppDeployer {
* {@code false} otherwise.
*/
private static boolean isClusterDeploymentFinished(JsonNode clusterStatus) {
if (clusterStatus == null || !clusterStatus.isObject())
// Catch various failure states, e.g., SAL spuriously returning null
return false;
return clusterStatus.withArray("/nodes")
.findParents("state")
.stream()
@ -213,12 +217,15 @@ public class NebulousAppDeployer {
// controller.
// - Select node candidates, making sure to only select edge nodes
// once.
// - (Before deploying the cluster) send metric name list.
// - Create a SAL cluster.
// - Deploy the SAL cluster.
// - Add node affinity traits to the KubeVela file.
// - Deploy the SAL application.
// - Store cluster state (deployed KubeVela file, etc.) in
// NebulousApp object.
// - Asynchronously, triggered via solver readiness message: wait for
// solver to be ready, send AMPL and re-send metric name list.
// ------------------------------------------------------------
// Extract node requirements
@ -355,8 +362,9 @@ public class NebulousAppDeployer {
return;
}
// TODO: send performance indicators (for monitoring system, which
// needs it before cluster creation)
// ------------------------------------------------------------
// Send metrics to EMS
app.sendMetricList();
// ------------------------------------------------------------
// Create cluster

View File

@ -7,15 +7,24 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.databind.JsonNode;
/**
* Class that manages a collection of NebulousApp instances.
* Class that manages a collection of NebulousApp instances. Also used to
* temporarily store the relevant performance indicators which are calculated
* by the utility evaluator from the same incoming app creation message.
*/
@Slf4j
public class NebulousApps {
/** The global app registry. */
// (Putting this here until we find a better place.)
private static final Map<String, NebulousApp> apps = new ConcurrentHashMap<String, NebulousApp>();
private static final Map<String, NebulousApp> apps = new ConcurrentHashMap<>();
/**
* A place to store relevant performance indicator messages if the app
* object is not yet available.
*/
public static final Map<String, JsonNode> relevantPerformanceIndicators = new ConcurrentHashMap<>();
/**
* Add a new application object to the registry.