Merge "Deploy application using cluster endpoints"
This commit is contained in:
commit
6771f7e05f
@ -23,18 +23,23 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A class that connects to the EXN middleware and starts listening to
|
||||
* messages from the ActiveMQ server.
|
||||
*
|
||||
* This class will drive the main behavior of the optimiser-controller: the
|
||||
* <p>This class will drive the main behavior of the optimiser-controller: the
|
||||
* `Consumer` objects created in {@link ExnConnector#ExnConnector} receive
|
||||
* incoming messages and react to them, sending out messages in turn.
|
||||
*
|
||||
* <p>The class also provides methods wrapping the exn-sal middleware
|
||||
* endpoints, converting from raw JSON responses to sal-common datatypes where
|
||||
* possible.
|
||||
*/
|
||||
@Slf4j
|
||||
public class ExnConnector {
|
||||
@ -67,31 +72,24 @@ public class ExnConnector {
|
||||
// ----------------------------------------
|
||||
// Communication with SAL
|
||||
|
||||
// We define these publishers here instead of in the `SalConnector`
|
||||
// class since they need to be registered here and I'm afraid I will
|
||||
// forget to do it when adding new endpoints over in another class.
|
||||
|
||||
/** The createJob endpoint. */
|
||||
public static final SyncedPublisher createJob
|
||||
= new SyncedPublisher("createJob",
|
||||
"eu.nebulouscloud.exn.sal.job.post", true, true);
|
||||
public final SyncedPublisher createJob;
|
||||
/** The findNodeCandidates endpoint. Should not be used during normal
|
||||
* operation--ask the broker instead. */
|
||||
public static final SyncedPublisher findNodeCandidates
|
||||
= new SyncedPublisher("findNodeCandidates",
|
||||
"eu.nebulouscloud.exn.sal.nodecandidate.get", true, true);
|
||||
/** The findNodeCandidates endpoint (Broker's version). */
|
||||
public static final SyncedPublisher findBrokerNodeCandidates
|
||||
= new SyncedPublisher("findBrokerNodeCandidates",
|
||||
"eu.nebulouscloud.cfsb.get_node_candidates", true, true);
|
||||
/** The addNodes endpoint. */
|
||||
public static final SyncedPublisher addNodes
|
||||
= new SyncedPublisher("addNodes",
|
||||
"eu.nebulouscloud.exn.sal.nodes.add", true, true);
|
||||
/** The submitJob endpoint. */
|
||||
public static final SyncedPublisher submitJob
|
||||
= new SyncedPublisher("submitJob",
|
||||
"eu.nebulouscloud.exn.sal.job.update", true, true);
|
||||
public final SyncedPublisher findSalNodeCandidates;
|
||||
/** The findNodeCandidates endpoint (Broker's version). This one adds
|
||||
* attributes "score", "rank" to the answer it gets from SAL. */
|
||||
public final SyncedPublisher findBrokerNodeCandidates;
|
||||
/** The defineCluster endpoint. */
|
||||
public final SyncedPublisher defineCluster;
|
||||
/** The deployCluster endpoint. */
|
||||
public final SyncedPublisher deployCluster;
|
||||
/** The deployApplication endpoint. */
|
||||
public final SyncedPublisher deployApplication;
|
||||
/** The scaleOut endpoint. */
|
||||
public final SyncedPublisher scaleOut;
|
||||
/** The scaleIn endpoint. */
|
||||
public final SyncedPublisher scaleIn;
|
||||
|
||||
/**
|
||||
* Create a connection to ActiveMQ via the exn middleware, and set up the
|
||||
@ -107,12 +105,19 @@ public class ExnConnector {
|
||||
*/
|
||||
public ExnConnector(String host, int port, String name, String password, ConnectorHandler callback) {
|
||||
amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true);
|
||||
createJob = new SyncedPublisher("createJob", "eu.nebulouscloud.exn.sal.job.post", true, true);
|
||||
findSalNodeCandidates = new SyncedPublisher("findSalNodeCandidates", "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true);
|
||||
findBrokerNodeCandidates = new SyncedPublisher("findBrokerNodeCandidates", "eu.nebulouscloud.cfsb.get_node_candidates", true, true);
|
||||
defineCluster = new SyncedPublisher("defineCluster", "eu.nebulouscloud.exn.sal.cluster.define", true, true);
|
||||
deployCluster = new SyncedPublisher("deployCluster", "eu.nebulouscloud.exn.sal.cluster.deploy", true, true);
|
||||
deployApplication = new SyncedPublisher("deployApplication", "eu.nebulouscloud.exn.sal.cluster.deployApplication", true, true);
|
||||
scaleOut = new SyncedPublisher("scaleOut", "eu.nebulouscloud.exn.sal.cluster.scaleout", true, true);
|
||||
scaleIn = new SyncedPublisher("scaleIn", "eu.nebulouscloud.exn.sal.cluster.scalein", true, true);
|
||||
|
||||
conn = new Connector("optimiser_controller",
|
||||
callback,
|
||||
// List.of(new Publisher("config", "config", true)),
|
||||
List.of(amplMessagePublisher,
|
||||
createJob, findNodeCandidates, findBrokerNodeCandidates, addNodes, submitJob),
|
||||
findSalNodeCandidates, findBrokerNodeCandidates, defineCluster, deployCluster, deployApplication, scaleOut, scaleIn),
|
||||
List.of(
|
||||
new Consumer("ui_app_messages", app_creation_channel,
|
||||
new AppCreationMessageHandler(), true, true),
|
||||
@ -140,7 +145,7 @@ public class ExnConnector {
|
||||
/**
|
||||
* Disconnect from ActiveMQ and stop all Consumer processes. Also count
|
||||
* down the countdown latch passed in the {@link
|
||||
* ExnConnector#start(CountDownLatch)} method if applicable.
|
||||
* #start(CountDownLatch)} method if applicable.
|
||||
*/
|
||||
public synchronized void stop() {
|
||||
conn.stop();
|
||||
@ -150,6 +155,9 @@ public class ExnConnector {
|
||||
log.debug("ExnConnector stopped.");
|
||||
}
|
||||
|
||||
// ----------------------------------------
|
||||
// Message Handlers
|
||||
|
||||
/**
|
||||
* A message handler that processes app creation messages coming in via
|
||||
* `eu.nebulouscloud.ui.dsl.generic`. Such messages contain, among
|
||||
@ -216,4 +224,334 @@ public class ExnConnector {
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------
|
||||
// Communication with SAL
|
||||
|
||||
/**
|
||||
* Extract and check the SAL response from an exn-middleware response.
|
||||
* The SAL response will be valid JSON encoded as a string in the "body"
|
||||
* field of the response. If the response is of the following form, log
|
||||
* an error and return a missing node instead:
|
||||
*
|
||||
* <pre>{@code
|
||||
* {
|
||||
* "key": <known exception key>,
|
||||
* "message": "some error message"
|
||||
* }
|
||||
* }</pre>
|
||||
*
|
||||
* @param response The response from exn-middleware.
|
||||
* @param appID The application ID, used for logging only.
|
||||
* @return The SAL response as a parsed JsonNode, or a node where {@code
|
||||
* isMissingNode()} will return true if SAL reported an error.
|
||||
*/
|
||||
private static JsonNode extractPayloadFromExnResponse(Map<String, Object> response, String appID) {
|
||||
String body = (String)response.get("body");
|
||||
JsonNode payload = mapper.missingNode();
|
||||
try {
|
||||
payload = mapper.readTree(body);
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not read message body as JSON: " + body, keyValue("appId", appID), e);
|
||||
return mapper.missingNode();
|
||||
}
|
||||
// These messages are listed in the {@code AbstractProcessor} class of
|
||||
// the exn-middleware project.
|
||||
if (Set.of("generic-exception-error",
|
||||
"gateway-client-exception-error",
|
||||
"gateway-server-exception-error")
|
||||
.contains(payload.at("/key").asText())
|
||||
&& !payload.at("/message").isMissingNode()) {
|
||||
log.error("exn-middleware-sal request failed with error type '{}' and message '{}'",
|
||||
payload.at("/key").asText(),
|
||||
payload.at("/message").asText(),
|
||||
keyValue("appId", appID));
|
||||
return mapper.missingNode();
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of node candidates from the resource broker that fulfill the
|
||||
* given requirements, and sort them by rank and score so that better node
|
||||
* candidates come first in the result.
|
||||
*
|
||||
* <p>A candidate is better than another one if it has a lower rank or, if
|
||||
* the rank is equal, a higher score.
|
||||
*
|
||||
* @param requirements The list of requirements.
|
||||
* @param appID The application ID.
|
||||
* @return A sorted List containing node candidates, better candidates
|
||||
* first.
|
||||
*/
|
||||
public List<NodeCandidate> findNodeCandidates(List<Requirement> requirements, String appID) {
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of(
|
||||
"metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(requirements));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert requirements list to JSON string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return null;
|
||||
}
|
||||
Map<String, Object> response = findBrokerNodeCandidates.sendSync(msg, appID, null, false);
|
||||
// Note: we do not call extractPayloadFromExnResponse here, since this
|
||||
// response does not come from the exn-middleware.
|
||||
ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class);
|
||||
// Note: what we would really like to do here is something like:
|
||||
// return Arrays.asList(mapper.readValue(response, NodeCandidate[].class));
|
||||
// But since the broker adds two attributes, the array elements cannot
|
||||
// be deserialized into org.ow2.proactive.sal.model.NodeCandidate
|
||||
// objects.
|
||||
List<JsonNode> result = Arrays.asList(mapper.convertValue(jsonBody.withArray("/body"), JsonNode[].class));
|
||||
result.sort((JsonNode c1, JsonNode c2) -> {
|
||||
long rank1 = c1.at("/rank").longValue();
|
||||
long rank2 = c2.at("/rank").longValue();
|
||||
double score1 = c1.at("/score").doubleValue();
|
||||
double score2 = c2.at("/score").doubleValue();
|
||||
// We return < 0 if c1 < c2. Since we want to sort better
|
||||
// candidates first, c1 < c2 if rank is lower or rank is equal
|
||||
// and score is higher. (Lower rank = better, higher score =
|
||||
// better.)
|
||||
if (rank1 != rank2) return Math.toIntExact(rank1 - rank2);
|
||||
else return Math.toIntExact(Math.round(score2 - score1));
|
||||
});
|
||||
return result.stream()
|
||||
.map(candidate ->
|
||||
mapper.convertValue(
|
||||
((ObjectNode)candidate).deepCopy().remove(List.of("score", "rank")),
|
||||
NodeCandidate.class))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of node candidates from the resource broker that fulfil the
|
||||
* given requirements.
|
||||
*
|
||||
* <p>Note that we cannot convert the result to a list containing {@code
|
||||
* org.ow2.proactive.sal.model.NodeCandidate} instances, since the broker
|
||||
* adds the additional fields {@code score} and {@code ranking}. Instead
|
||||
* we return a JSON {@code ArrayNode} containing {@code ObjectNode}s in
|
||||
* the format specified at
|
||||
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/nodecandidates-endpoints.md#71--filter-node-candidates-endpoint
|
||||
* but with these two additional attributes.
|
||||
*
|
||||
* @param requirements The list of requirements.
|
||||
* @param appID The application ID.
|
||||
* @return A list containing node candidates, or null in case of error.
|
||||
*/
|
||||
public List<NodeCandidate> findNodeCandidatesFromSal(List<Requirement> requirements, String appID) {
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of(
|
||||
"metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(requirements));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert requirements list to JSON string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return null;
|
||||
}
|
||||
Map<String, Object> response = findSalNodeCandidates.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
if (payload.isMissingNode()) return null;
|
||||
try {
|
||||
return Arrays.asList(mapper.treeToValue(payload, NodeCandidate[].class));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not decode node candidates payload", keyValue("appId", appID), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Define a cluster with the given name and node list.
|
||||
*
|
||||
* <p>The nodes are passed in a JSON array containing objects of the
|
||||
* following shape:
|
||||
*
|
||||
* <pre>{@code
|
||||
* {
|
||||
* "nodeName": "some-component",
|
||||
* "nodeCandidateId": "some-candidate-id",
|
||||
* "cloudId": "some-cloud-id"
|
||||
* }
|
||||
* }</pre>
|
||||
*
|
||||
* <p>Each value for {@code nodeName} has to be unique, and should be
|
||||
* either the name of the master node or the name of a node that will
|
||||
* subsequently be referenced in the affinity trait of the modified
|
||||
* kubevela file (see {@link NebulousAppDeployer#addNodeAffinities()}).
|
||||
*
|
||||
* <p>The values for {@code nodeCandidateId} and {@code cloudId} come from
|
||||
* the return value of a call to {@link #findNodeCandidates()}.
|
||||
*
|
||||
* <p>Note that this method could be rewritten to accept the nodes as a
|
||||
* {@code List<org.ow2.proactive.sal.model.IaasNode>} instead, if that is
|
||||
* more convenient.
|
||||
*
|
||||
* @param appID The application's id, used to name the cluster.
|
||||
* @param masterNodeName The name of the master node.
|
||||
* @param nodes A JSON array containing the node definitions.
|
||||
* @return true if the cluster was successfully defined, false otherwise.
|
||||
*/
|
||||
public boolean defineCluster(String appID, String masterNodeName, ArrayNode nodes) {
|
||||
// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed
|
||||
ObjectNode body = mapper.createObjectNode()
|
||||
.put("name", appID)
|
||||
.put("master-node", masterNodeName);
|
||||
body.putArray("nodes").addAll(nodes);
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of("metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(body));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert JSON to string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return false;
|
||||
}
|
||||
Map<String, Object> response = defineCluster.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
return payload.asBoolean();
|
||||
// TODO: check if we still need to unwrap this; see
|
||||
// `AbstractProcessor.groovy#normalizeResponse` and bug 2055053
|
||||
// https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111
|
||||
// https://bugs.launchpad.net/nebulous/+bug/2055053
|
||||
// return payload.at("/success").asBoolean();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the definition of a cluster created by {@link #defineCluster}.
|
||||
*
|
||||
* @param appID The application ID, as used to define the cluster.
|
||||
* @return The cluster definition, or null in case of error.
|
||||
*/
|
||||
public JsonNode getCluster(String appID) {
|
||||
Map<String, Object> msg;
|
||||
msg = Map.of("metaData", Map.of("user", "admin", "clusterName", appID));
|
||||
Map<String, Object> response = deployCluster.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
return payload.isMissingNode() ? null : payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deploy a cluster created by {@link #defineCluster}.
|
||||
*
|
||||
* @param appID The application's id, used to name the cluster.
|
||||
* @return true if the cluster was successfully deployed, false otherwise.
|
||||
*/
|
||||
public boolean deployCluster(String appID) {
|
||||
// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed
|
||||
ObjectNode body = mapper.createObjectNode()
|
||||
.put("applicationId", appID);
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of("metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(body));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert JSON to string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return false;
|
||||
}
|
||||
Map<String, Object> response = deployCluster.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
return payload.asBoolean();
|
||||
// TODO: check if we still need to unwrap this; see
|
||||
// `AbstractProcessor.groovy#normalizeResponse` and bug 2055053
|
||||
// https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111
|
||||
// https://bugs.launchpad.net/nebulous/+bug/2055053
|
||||
// return payload.at("/success").asBoolean();
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a KubeVela file to a deployed cluster.
|
||||
*
|
||||
* @param appID The application's id.
|
||||
* @param kubevela The KubeVela file, with node affinity traits
|
||||
* corresponding to the cluster definintion.
|
||||
* @return true if the application was successfully deployed, false otherwise.
|
||||
*/
|
||||
public boolean deployApplication(String appID, String kubevela) {
|
||||
// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed
|
||||
ObjectNode body = mapper.createObjectNode()
|
||||
.put("applicationId", appID)
|
||||
.put("KubevelaYaml", kubevela);
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of("metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(body));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert JSON to string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return false;
|
||||
}
|
||||
Map<String, Object> response = deployApplication.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
return payload.asBoolean();
|
||||
// TODO: check if we still need to unwrap this; see
|
||||
// `AbstractProcessor.groovy#normalizeResponse` and bug 2055053
|
||||
// https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111
|
||||
// https://bugs.launchpad.net/nebulous/+bug/2055053
|
||||
// return payload.at("/success").asBoolean();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add new nodes to a deployed cluster.
|
||||
*
|
||||
* <p>The new nodes are specified in the same way as in {@link
|
||||
* #defineCluster()}.
|
||||
*
|
||||
* @param appID The application's id.
|
||||
* @param additionalWorkers The additional nodes to add.
|
||||
*/
|
||||
// TODO: deserialize response into sal-common `Cluster`
|
||||
public void scaleOut(String appID, ArrayNode additionalNodes) {
|
||||
// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed
|
||||
ObjectNode body = mapper.createObjectNode()
|
||||
.put("applicationId", appID);
|
||||
body.putArray("workers").addAll(additionalNodes);
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of("metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(body));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert JSON to string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return;
|
||||
}
|
||||
Map<String, Object> response = scaleOut.sendSync(msg, appID, null, false);
|
||||
// Called for side-effect only; we want to log errors
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove nodes from a deployed cluster.
|
||||
*
|
||||
* @param appID The application's id.
|
||||
* @param superfluousNodes The names of nodes to be removed.
|
||||
* @return true if the call was successful, false otherwise.
|
||||
*/
|
||||
public boolean scaleIn(String appID, List<String> superfluousNodes) {
|
||||
// NOTE: not yet defined in
|
||||
// https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed
|
||||
ArrayNode body = mapper.createArrayNode();
|
||||
superfluousNodes.forEach(nodeName -> body.add(nodeName));
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of("metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(body));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert JSON to string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return false;
|
||||
}
|
||||
Map<String, Object> response = scaleIn.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
return payload.asBoolean();
|
||||
// TODO: check if we still need to unwrap this; see
|
||||
// `AbstractProcessor.groovy#normalizeResponse` and bug 2055053
|
||||
// https://opendev.org/nebulous/exn-middleware/src/commit/ffc2ca7bdf657b3831d2b803ff2b84d5e8e1bdcd/exn-middleware-core/src/main/groovy/eu/nebulouscloud/exn/modules/sal/processors/AbstractProcessor.groovy#L111
|
||||
// https://bugs.launchpad.net/nebulous/+bug/2055053
|
||||
// return payload.at("/success").asBoolean();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import java.util.concurrent.CountDownLatch;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine.Command;
|
||||
import picocli.CommandLine.Parameters;
|
||||
|
@ -12,6 +12,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import static net.logstash.logback.argument.StructuredArguments.keyValue;
|
||||
|
||||
@ -22,6 +23,7 @@ import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Spliterator;
|
||||
@ -29,6 +31,8 @@ import java.util.Spliterators;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.ow2.proactive.sal.model.Requirement;
|
||||
|
||||
/**
|
||||
* Internal representation of a NebulOus app.
|
||||
*/
|
||||
@ -97,6 +101,14 @@ public class NebulousApp {
|
||||
@Getter private JsonNode originalAppMessage;
|
||||
private ObjectNode original_kubevela;
|
||||
|
||||
/**
|
||||
* The current "generation" of deployment. Initial deployment sets this
|
||||
* to 1, each subsequent redeployment increases by 1. This value is used
|
||||
* to name node instances generated during that deployment.
|
||||
*/
|
||||
@Getter @Setter
|
||||
private int deployGeneration = 0;
|
||||
|
||||
/**
|
||||
* Map of component name to machine name(s) deployed for that component.
|
||||
* Component names are defined in the KubeVela file. We assume that
|
||||
@ -111,9 +123,20 @@ public class NebulousApp {
|
||||
|
||||
/** When an app gets deployed or redeployed, this is where we send the AMPL file */
|
||||
private Publisher ampl_message_channel;
|
||||
/** Have we ever been deployed? I.e., when we rewrite KubeVela, are there
|
||||
* already nodes running for us? */
|
||||
private boolean deployed = false;
|
||||
// /** Have we ever been deployed? I.e., when we rewrite KubeVela, are there
|
||||
// * already nodes running for us? */
|
||||
// private boolean deployed = false;
|
||||
|
||||
/** The KubeVela as it was most recently sent to the app's controller. */
|
||||
@Getter @Setter
|
||||
private JsonNode deployedKubevela;
|
||||
/** For each KubeVela component, the number of deployed nodes. All nodes
|
||||
* will be identical wrt machine type etc. */
|
||||
@Getter @Setter
|
||||
private Map<String, Integer> deployedNodeCounts;
|
||||
/** For each KubeVela component, the requirements for its node(s). */
|
||||
@Getter @Setter
|
||||
private Map<String, List<Requirement>> deployedNodeRequirements;
|
||||
|
||||
/**
|
||||
* The EXN connector for this class. At the moment all apps share the
|
||||
@ -250,25 +273,6 @@ public class NebulousApp {
|
||||
return readKubevelaString(Files.readString(Path.of(path), StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set "deployed" status. Will typically be set to true once, and then
|
||||
* never to false again.
|
||||
*
|
||||
* @param deployed the new status.
|
||||
*/
|
||||
public void setDeployed(boolean deployed) {
|
||||
this.deployed = deployed;
|
||||
}
|
||||
/**
|
||||
* Check if the app has been deployed, i.e., if there are already VMs
|
||||
* allocated from SAL for us.
|
||||
*
|
||||
* @return false if we never asked for nodes, true otherwise.
|
||||
*/
|
||||
public boolean isDeployed() {
|
||||
return deployed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all parameters have a name, type and path, and that the
|
||||
* target path can be found in the original KubeVela file.
|
||||
@ -433,7 +437,7 @@ public class NebulousApp {
|
||||
}
|
||||
ObjectNode variables = solution.withObjectProperty("VariableValues");
|
||||
ObjectNode kubevela = rewriteKubevelaWithSolution(variables);
|
||||
if (isDeployed()) {
|
||||
if (deployGeneration > 0) {
|
||||
// We assume that killing a node will confuse the application's
|
||||
// Kubernetes cluster, therefore:
|
||||
// 1. Recalculate node sets
|
||||
|
@ -7,8 +7,8 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer;
|
||||
import org.ow2.proactive.sal.model.AttributeRequirement;
|
||||
import org.ow2.proactive.sal.model.CommandsInstallation;
|
||||
import org.ow2.proactive.sal.model.NodeCandidate;
|
||||
import org.ow2.proactive.sal.model.NodeCandidate.NodeCandidateTypeEnum;
|
||||
import org.ow2.proactive.sal.model.NodeType;
|
||||
import org.ow2.proactive.sal.model.NodeTypeRequirement;
|
||||
import org.ow2.proactive.sal.model.OperatingSystemFamily;
|
||||
@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import static net.logstash.logback.argument.StructuredArguments.keyValue;
|
||||
|
||||
@ -33,24 +32,9 @@ import static net.logstash.logback.argument.StructuredArguments.keyValue;
|
||||
@Slf4j
|
||||
public class NebulousAppDeployer {
|
||||
|
||||
// TODO: find out the commands to initialize the controller
|
||||
/**
|
||||
* The installation scripts to send to SAL for the NebulOuS controller
|
||||
* node.
|
||||
*/
|
||||
@Getter
|
||||
private static CommandsInstallation controllerInstallation = new CommandsInstallation();
|
||||
|
||||
private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
// TODO: find out the commands to initialize the workers
|
||||
/**
|
||||
* The installation scripts to send to SAL for a NebulOuS worker node.
|
||||
*/
|
||||
@Getter
|
||||
private static CommandsInstallation nodeInstallation = new CommandsInstallation();
|
||||
|
||||
/**
|
||||
* The requirements of the node running the NebulOuS controller.
|
||||
* This machine runs the Kubernetes cluster and KubeVela. For
|
||||
@ -122,23 +106,38 @@ public class NebulousAppDeployer {
|
||||
* Given a KubeVela file, extract node requirements, create the job, start
|
||||
* its nodes and submit KubeVela.
|
||||
*
|
||||
* NOTE: this method is under reconstruction, pending the new endpoints.
|
||||
* <p>NOTE: this method modifies the NebulousApp object state, storing
|
||||
* various facts about the deployed cluster.
|
||||
*
|
||||
* @param app the NebulOuS app object.
|
||||
* <p>NOTE: this method is under reconstruction, pending the new
|
||||
* endpoints.
|
||||
*
|
||||
* @param app The NebulOuS app object.
|
||||
* @param kubevela the KubeVela file to deploy.
|
||||
*/
|
||||
public static void deployApplication(NebulousApp app, JsonNode kubevela) {
|
||||
String appUUID = app.getUUID();
|
||||
ExnConnector conn = app.getExnConnector();
|
||||
Set<NodeCandidate> chosenEdgeCandidates = new HashSet<>();
|
||||
log.info("Starting initial deployment for application", keyValue("appId", appUUID));
|
||||
|
||||
int deployGeneration = app.getDeployGeneration() + 1;
|
||||
app.setDeployGeneration(deployGeneration);
|
||||
|
||||
// The overall flow:
|
||||
//
|
||||
// 1. Extract node requirements and node counts from the KubeVela
|
||||
// definition.
|
||||
// 2. Find node candidates for all workers and the controller.
|
||||
// 3. Select node candidates.
|
||||
// 2. Ask resource broker for node candidates for all workers and the
|
||||
// controller.
|
||||
// 3. Select node candidates, making sure to only select edge nodes
|
||||
// once.
|
||||
// 4. Create a SAL cluster.
|
||||
// 5. Deploy the SAL cluster.
|
||||
// 6. Add node affinity traits to the KubeVela file.
|
||||
// 7. Deploy the SAL application.
|
||||
// 8. Store cluster state (deployed KubeVela file, etc.) in
|
||||
// NebulousApp object.
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 1. Extract node requirements
|
||||
@ -152,29 +151,46 @@ public class NebulousAppDeployer {
|
||||
// ----------------------------------------
|
||||
// 2. Find node candidates
|
||||
|
||||
// ArrayNode controllerCandidates = SalConnector.findNodeCandidates(controllerRequirements, appUUID);
|
||||
// if (controllerCandidates.isEmpty()) {
|
||||
// log.error("Could not find node candidates for requirements: {}",
|
||||
// controllerRequirements, keyValue("appId", appUUID));
|
||||
// // Continue here while we don't really deploy
|
||||
// // return;
|
||||
// }
|
||||
// Map<String, ArrayNode> workerCandidates = new HashMap<>();
|
||||
// for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) {
|
||||
// String nodeName = e.getKey();
|
||||
// List<Requirement> requirements = e.getValue();
|
||||
// ArrayNode candidates = SalConnector.findNodeCandidates(requirements, appUUID);
|
||||
// if (candidates.isEmpty()) {
|
||||
// log.error("Could not find node candidates for requirements: {}", requirements);
|
||||
// // Continue here while we don't really deploy
|
||||
// // return;
|
||||
// }
|
||||
// workerCandidates.put(nodeName, candidates);
|
||||
// }
|
||||
List<NodeCandidate> controllerCandidates = conn.findNodeCandidates(controllerRequirements, appUUID);
|
||||
if (controllerCandidates.isEmpty()) {
|
||||
log.error("Could not find node candidates for requirements: {}",
|
||||
controllerRequirements, keyValue("appId", appUUID));
|
||||
// Continue here while we don't really deploy
|
||||
// return;
|
||||
}
|
||||
Map<String, List<NodeCandidate>> workerCandidates = new HashMap<>();
|
||||
for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) {
|
||||
String nodeName = e.getKey();
|
||||
List<Requirement> requirements = e.getValue();
|
||||
List<NodeCandidate> candidates = conn.findNodeCandidates(requirements, appUUID);
|
||||
if (candidates.isEmpty()) {
|
||||
log.error("Could not find node candidates for requirements: {}", requirements,
|
||||
keyValue("appId", appUUID));
|
||||
// Continue here while we don't really deploy
|
||||
// return;
|
||||
}
|
||||
workerCandidates.put(nodeName, candidates);
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 3. Select node candidates
|
||||
|
||||
// Controller node
|
||||
log.debug("Deciding on controller node candidate", keyValue("appId", appUUID));
|
||||
NodeCandidate masterNodeCandidate = null;
|
||||
if (controllerCandidates.size() > 0) {
|
||||
masterNodeCandidate = controllerCandidates.get(0);
|
||||
if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE)
|
||||
.contains(masterNodeCandidate.getNodeCandidateType())) {
|
||||
// Mark this candidate as already chosen
|
||||
chosenEdgeCandidates.add(masterNodeCandidate);
|
||||
}
|
||||
} else {
|
||||
log.error("Empty node candidate list for controller, continuing without creating node",
|
||||
keyValue("appId", appUUID));
|
||||
}
|
||||
|
||||
// Component nodes
|
||||
log.debug("Collecting worker nodes for {}", appUUID, keyValue("appId", appUUID));
|
||||
Map<String, NodeCandidate> nodeNameToCandidate = new HashMap<>();
|
||||
for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) {
|
||||
@ -185,28 +201,30 @@ public class NebulousAppDeployer {
|
||||
int numberOfNodes = nodeCounts.get(componentName);
|
||||
Set<String> nodeNames = new HashSet<>();
|
||||
for (int i = 1; i <= numberOfNodes; i++) {
|
||||
String nodeName = String.format("%s-%s", componentName, i);
|
||||
String nodeName = String.format("%s-%s-%s", componentName, deployGeneration, i);
|
||||
List<NodeCandidate> candidates = workerCandidates.get(componentName);
|
||||
|
||||
if (candidates.size() == 0) {
|
||||
log.error("Empty node candidate list for component ~s, continuing without creating node", componentName, keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
|
||||
NodeCandidate candidate = candidates.stream()
|
||||
.filter(each -> !chosenEdgeCandidates.contains(each))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE).contains(candidate.getNodeCandidateType())) {
|
||||
// We could remove this candidate from `candidates` here,
|
||||
// to save skipping over already-assigned edge nodes for
|
||||
// the next replica of this component, but we don't want
|
||||
// to make assumptions on whether the candidate list can
|
||||
// be modified. Note that we have to keep track of all
|
||||
// assigned edge nodes in any case, since they might be
|
||||
// candidates in subsequent components.
|
||||
chosenEdgeCandidates.add(candidate);
|
||||
}
|
||||
nodeNameToCandidate.put(nodeName, candidate);
|
||||
nodeNames.add(nodeName);
|
||||
// TODO: choose the node candidate with the highest score
|
||||
// and/or ranking.
|
||||
|
||||
// TODO: Here we need to discriminate between edge and cloud
|
||||
// node candidates: we can deploy an edge node only once, but
|
||||
// cloud nodes arbitrarily often. So if the best node
|
||||
// candidate is an edge node, we should select it and fill the
|
||||
// rest of the nodes with second-best cloud nodes.
|
||||
|
||||
// // TODO: make sure we only choose the same edge node once; it
|
||||
// // might be in all node candidate lists :)
|
||||
// if (!workerCandidates.get(componentName).isEmpty()) {
|
||||
// // should always be true, except currently we don't abort
|
||||
// // in Step 2 if we don't find candidates.
|
||||
// JsonNode candidate = workerCandidates.get(componentName).get(0);
|
||||
// NodeCandidate c = mapper.convertValue(((ObjectNode)candidate).deepCopy()
|
||||
// .remove(List.of("score", "ranking")),
|
||||
// NodeCandidate.class);
|
||||
// nodeNameToCandidate.put(nodeName, c);
|
||||
// }
|
||||
}
|
||||
app.getComponentMachineNames().put(componentName, nodeNames);
|
||||
}
|
||||
@ -216,30 +234,57 @@ public class NebulousAppDeployer {
|
||||
// ------------------------------------------------------------
|
||||
// 4. Create cluster
|
||||
|
||||
// TODO: call defineCluster endpoint with nodename -> candidate
|
||||
// mapping etc.
|
||||
String masterNodeName = "masternode"; // safe because all component node names end with a number
|
||||
ObjectNode cluster = mapper.createObjectNode();
|
||||
cluster.put("name", appUUID)
|
||||
.put("master-node", masterNodeName);
|
||||
ArrayNode nodes = cluster.withArray("nodes");
|
||||
if (masterNodeCandidate != null) {
|
||||
nodes.addObject()
|
||||
.put("nodeName", masterNodeName)
|
||||
.put("nodeCandidateId", masterNodeCandidate.getId())
|
||||
.put("cloudId", masterNodeCandidate.getCloud().getId());
|
||||
}
|
||||
nodeNameToCandidate.forEach((name, candidate) -> {
|
||||
nodes.addObject()
|
||||
.put("nodeName", name)
|
||||
.put("nodeCandidateId", candidate.getId())
|
||||
.put("cloudId", candidate.getCloud().getId());
|
||||
});
|
||||
boolean defineClusterSuccess = conn.defineCluster(appUUID, masterNodeName, null);
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 5. Deploy cluster
|
||||
boolean deployClusterSuccess = conn.deployCluster(appUUID);
|
||||
|
||||
// TODO: call deployCluster endpoint
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 6. Rewrite KubeVela
|
||||
JsonNode rewritten = addNodeAffinities(kubevela, app.getComponentMachineNames());
|
||||
String rewritten_kubevela = "---\n# Did not manage to create rewritten KubeVela";
|
||||
try {
|
||||
rewritten_kubevela = yamlMapper.writeValueAsString(rewritten);
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to convert KubeVela to YAML; this should never happen", e);
|
||||
log.error("Failed to convert KubeVela to YAML; this should never happen", keyValue("appId", appUUID), e);
|
||||
}
|
||||
Main.logFile("rewritten-kubevela-" + appUUID + ".yaml", rewritten_kubevela);
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 7. Deploy application
|
||||
|
||||
// TODO: call deployApplication endpoint
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 8. Update NebulousApp state
|
||||
|
||||
// TODO: store rewritten KubeVela in application object
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a KubeVela file, adapt the running application to its specification.
|
||||
* Given a KubeVela file, adapt the running application to its
|
||||
specification.
|
||||
*
|
||||
* The KubeVela file will have been rewritten with updated
|
||||
* information from the solver.
|
||||
* The KubeVela file will have been rewritten with updated information
|
||||
* from the solver.
|
||||
*
|
||||
* NOTE: this method is under development, pending the new endpoints.
|
||||
*
|
||||
@ -248,16 +293,17 @@ public class NebulousAppDeployer {
|
||||
*/
|
||||
public static void redeployApplication(NebulousApp app, ObjectNode kubevela) {
|
||||
String appUUID = app.getUUID();
|
||||
log.info("Starting redeployment of {}", appUUID);
|
||||
int deployGeneration = app.getDeployGeneration() + 1;
|
||||
app.setDeployGeneration(deployGeneration);
|
||||
|
||||
log.info("Starting redeployment generation {}", deployGeneration, keyValue("appId", appUUID));
|
||||
// The overall flow:
|
||||
//
|
||||
// 1. Extract node requirements and node counts from the updated
|
||||
// KubeVela definition.
|
||||
// 2. Extract current nodes from running SAL job
|
||||
// 3. Calculate new (to be started) and superfluous (to be shutdown)
|
||||
// nodes
|
||||
// 4. Find node candidates for new nodes (from Step 3) according to
|
||||
// 2. Calculate new (to be started) and superfluous (to be shutdown)
|
||||
// nodes by comparing against previous deployment.
|
||||
// 3. Find node candidates for new nodes (from Step 3) according to
|
||||
// their requirements (from Step 1)
|
||||
// 5. Rewrite KubeVela with updated node affinities
|
||||
// 6. Call clusterScaleOut endpoint with list of added nodes
|
||||
@ -269,11 +315,11 @@ public class NebulousAppDeployer {
|
||||
// 1. Extract node requirements
|
||||
Map<String, List<Requirement>> workerRequirements = KubevelaAnalyzer.getRequirements(kubevela);
|
||||
Map<String, Integer> nodeCounts = KubevelaAnalyzer.getNodeCount(kubevela);
|
||||
List<Requirement> controllerRequirements = getControllerRequirements(appUUID);
|
||||
|
||||
|
||||
|
||||
Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements);
|
||||
Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts);
|
||||
Main.logFile("controller-requirements-" + appUUID + ".txt", controllerRequirements);
|
||||
|
||||
}
|
||||
|
||||
|
@ -1,73 +0,0 @@
|
||||
package eu.nebulouscloud.optimiser.controller;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.ow2.proactive.sal.model.NodeCandidate;
|
||||
import org.ow2.proactive.sal.model.Requirement;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import static net.logstash.logback.argument.StructuredArguments.keyValue;
|
||||
|
||||
|
||||
/**
|
||||
* A class that wraps communication with SAL (the Scheduling Abstraction Layer
|
||||
* of ProActive) over EXN.
|
||||
*
|
||||
* Documentation of the SAL REST API is here:
|
||||
* https://github.com/ow2-proactive/scheduling-abstraction-layer/tree/master/documentation
|
||||
*/
|
||||
@Slf4j
|
||||
public class SalConnector {
|
||||
|
||||
private SalConnector() {}
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
/**
|
||||
* Get list of node candidates from the resource broker that fulfil the
|
||||
given requirements.
|
||||
*
|
||||
* <p>Note that we cannot convert the result to a list containing {@code
|
||||
* org.ow2.proactive.sal.model.NodeCandidate} instances, since the broker
|
||||
* adds the additional fields {@code score} and {@code ranking}. Instead
|
||||
* we return a JSON {@code ArrayNode} containing {@code ObjectNode}s in
|
||||
* the format specified at
|
||||
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/nodecandidates-endpoints.md#71--filter-node-candidates-endpoint
|
||||
* but with these two additional attributes.
|
||||
*
|
||||
* @param requirements The list of requirements.
|
||||
* @param appID The application ID.
|
||||
* @return A JSON array containing node candidates.
|
||||
*/
|
||||
public static ArrayNode findNodeCandidates(List<Requirement> requirements, String appID) {
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of(
|
||||
"metaData", Map.of("user", "admin"),
|
||||
"body", mapper.writeValueAsString(requirements));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert requirements list to JSON string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return null;
|
||||
}
|
||||
Map<String, Object> response = ExnConnector.findBrokerNodeCandidates.sendSync(msg, appID, null, false);
|
||||
ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class);
|
||||
// Note: what we would really like to do here is something like:
|
||||
//
|
||||
// return Arrays.asList(mapper.readValue(response, NodeCandidate[].class));
|
||||
//
|
||||
// But since the broker adds two attributes, the array elements cannot
|
||||
// be deserialized into org.ow2.proactive.sal.model.NodeCandidate
|
||||
// objects.
|
||||
return jsonBody.withArray("/nodes");
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user