Various fixes for deployment code
- Use labels instead of node names for component affinities. - Keep track of edge or byob machines: do not assign them more than once, even across redeployment generations. - Add `getCluster` endpoint, use it. - Add `labelNodes` endpoint, use it. - Make detection of SAL errors more robust: use `/metaData/status` field containing the HTTP response code. - Introduce per-app short cluster id. - Make node names globally unique. Change-Id: Id62d6e1c6939eb2f1b33b9b2062b574f7e90ff30
This commit is contained in:
parent
6771f7e05f
commit
645eb99e30
@ -13,9 +13,6 @@ import org.ow2.proactive.sal.model.AttributeRequirement;
|
||||
import org.ow2.proactive.sal.model.OperatingSystemFamily;
|
||||
import org.ow2.proactive.sal.model.Requirement;
|
||||
import org.ow2.proactive.sal.model.RequirementOperator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -34,18 +31,18 @@ public class KubevelaAnalyzer {
|
||||
* component. Note that this can be zero when the component should not be
|
||||
* deployed at all. This can happen for example when there is a cloud and
|
||||
* an edge version of the component and only one of them should run.<p>
|
||||
*
|
||||
*
|
||||
* We currently look for the following component trait:
|
||||
*
|
||||
*
|
||||
* <pre>{@code
|
||||
* traits:
|
||||
* - type: scaler
|
||||
* properties:
|
||||
* replicas: 2
|
||||
* }</pre>
|
||||
*
|
||||
*
|
||||
* If this trait is not found for a component, its count will be 1.
|
||||
*
|
||||
*
|
||||
* @param kubevela the parsed KubeVela file.
|
||||
* @return A map from component name to number of instances to generate.
|
||||
*/
|
||||
@ -81,30 +78,30 @@ public class KubevelaAnalyzer {
|
||||
/**
|
||||
* Extract node requirements from a KubeVela file in a form we can send to
|
||||
* the SAL `findNodeCandidates` endpoint. <p>
|
||||
*
|
||||
*
|
||||
* We read the following attributes for each component:
|
||||
*
|
||||
*
|
||||
* - `properties.cpu`, `properties.requests.cpu`: round up to next integer
|
||||
* and generate requirement `hardware.cores`
|
||||
*
|
||||
*
|
||||
* - `properties.memory`, `properties.requests.memory`: Handle "200Mi",
|
||||
* "0.2Gi" and bare number, convert to MB and generate requirement
|
||||
* `hardware.memory`
|
||||
*
|
||||
*
|
||||
* Notes:<p>
|
||||
*
|
||||
*
|
||||
* - We add the requirement that OS family == Ubuntu.<p>
|
||||
*
|
||||
*
|
||||
* - For the first version, we specify all requirements as "greater or
|
||||
* equal", i.e., we might not find precisely the node candidates that
|
||||
* are asked for. <p>
|
||||
*
|
||||
*
|
||||
* - Related, KubeVela specifies "cpu" as a fractional value, while SAL
|
||||
* wants the number of cores as a whole number. We round up to the
|
||||
* nearest integer and ask for "this or more" cores, since we might end
|
||||
* up with needing, e.g., 3 cores, which is not a configuration commonly
|
||||
* provided by cloud providers. <p>
|
||||
*
|
||||
*
|
||||
* @param kubevela the parsed KubeVela file.
|
||||
* @return a map of component name to (potentially empty, except for OS
|
||||
* family) list of requirements for that component. No requirements mean
|
||||
|
@ -82,6 +82,10 @@ public class ExnConnector {
|
||||
public final SyncedPublisher findBrokerNodeCandidates;
|
||||
/** The defineCluster endpoint. */
|
||||
public final SyncedPublisher defineCluster;
|
||||
/** The getCluster endpoint. */
|
||||
public final SyncedPublisher getCluster;
|
||||
/** The labelNodes endpoint. */
|
||||
public final SyncedPublisher labelNodes;
|
||||
/** The deployCluster endpoint. */
|
||||
public final SyncedPublisher deployCluster;
|
||||
/** The deployApplication endpoint. */
|
||||
@ -109,6 +113,8 @@ public class ExnConnector {
|
||||
findSalNodeCandidates = new SyncedPublisher("findSalNodeCandidates", "eu.nebulouscloud.exn.sal.nodecandidate.get", true, true);
|
||||
findBrokerNodeCandidates = new SyncedPublisher("findBrokerNodeCandidates", "eu.nebulouscloud.cfsb.get_node_candidates", true, true);
|
||||
defineCluster = new SyncedPublisher("defineCluster", "eu.nebulouscloud.exn.sal.cluster.define", true, true);
|
||||
getCluster = new SyncedPublisher("getCluster", "eu.nebulouscloud.exn.sal.cluster", true, true);
|
||||
labelNodes = new SyncedPublisher("labelNodes", "eu.nebulouscloud.exn.sal.cluster.label", true, true);
|
||||
deployCluster = new SyncedPublisher("deployCluster", "eu.nebulouscloud.exn.sal.cluster.deploy", true, true);
|
||||
deployApplication = new SyncedPublisher("deployApplication", "eu.nebulouscloud.exn.sal.cluster.deployApplication", true, true);
|
||||
scaleOut = new SyncedPublisher("scaleOut", "eu.nebulouscloud.exn.sal.cluster.scaleout", true, true);
|
||||
@ -117,7 +123,15 @@ public class ExnConnector {
|
||||
conn = new Connector("optimiser_controller",
|
||||
callback,
|
||||
List.of(amplMessagePublisher,
|
||||
findSalNodeCandidates, findBrokerNodeCandidates, defineCluster, deployCluster, deployApplication, scaleOut, scaleIn),
|
||||
findSalNodeCandidates,
|
||||
findBrokerNodeCandidates,
|
||||
defineCluster,
|
||||
getCluster,
|
||||
labelNodes,
|
||||
deployCluster,
|
||||
deployApplication,
|
||||
scaleOut,
|
||||
scaleIn),
|
||||
List.of(
|
||||
new Consumer("ui_app_messages", app_creation_channel,
|
||||
new AppCreationMessageHandler(), true, true),
|
||||
@ -205,7 +219,10 @@ public class ExnConnector {
|
||||
try {
|
||||
ObjectNode json_body = mapper.convertValue(body, ObjectNode.class);
|
||||
String app_id = message.property("application").toString(); // should be string already, but don't want to cast
|
||||
if (app_id == null) app_id = message.subject(); // TODO: remove for second version, leaving it in just to be safe
|
||||
if (app_id == null) {
|
||||
log.warn("Received solver solution without 'application' message property, discarding it");
|
||||
return;
|
||||
}
|
||||
Main.logFile("solver-solution-" + app_id + ".json", json_body);
|
||||
NebulousApp app = NebulousApps.get(app_id);
|
||||
if (app == null) {
|
||||
@ -215,7 +232,6 @@ public class ExnConnector {
|
||||
} else {
|
||||
log.debug("Received solver solutions for application",
|
||||
keyValue("appId", app_id));
|
||||
// TODO: check if solution should be deployed (it's a field in the message)
|
||||
app.processSolution(json_body);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -240,34 +256,32 @@ public class ExnConnector {
|
||||
* }
|
||||
* }</pre>
|
||||
*
|
||||
* @param response The response from exn-middleware.
|
||||
* @param responseMessage The response from exn-middleware.
|
||||
* @param appID The application ID, used for logging only.
|
||||
* @return The SAL response as a parsed JsonNode, or a node where {@code
|
||||
* isMissingNode()} will return true if SAL reported an error.
|
||||
*/
|
||||
private static JsonNode extractPayloadFromExnResponse(Map<String, Object> response, String appID) {
|
||||
String body = (String)response.get("body");
|
||||
JsonNode payload = mapper.missingNode();
|
||||
private static JsonNode extractPayloadFromExnResponse(Map<String, Object> responseMessage, String appID) {
|
||||
JsonNode response = mapper.valueToTree(responseMessage);
|
||||
String salRawResponse = response.at("/body").asText(); // it's already a string, asText() is for the type system
|
||||
JsonNode metadata = response.at("/metaData");
|
||||
JsonNode salResponse = mapper.missingNode(); // the data coming from SAL
|
||||
try {
|
||||
payload = mapper.readTree(body);
|
||||
salResponse = mapper.readTree(salRawResponse);
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not read message body as JSON: " + body, keyValue("appId", appID), e);
|
||||
log.error("Could not read message body as JSON: body = '{}'", salRawResponse,
|
||||
keyValue("appId", appID), e);
|
||||
return mapper.missingNode();
|
||||
}
|
||||
// These messages are listed in the {@code AbstractProcessor} class of
|
||||
// the exn-middleware project.
|
||||
if (Set.of("generic-exception-error",
|
||||
"gateway-client-exception-error",
|
||||
"gateway-server-exception-error")
|
||||
.contains(payload.at("/key").asText())
|
||||
&& !payload.at("/message").isMissingNode()) {
|
||||
log.error("exn-middleware-sal request failed with error type '{}' and message '{}'",
|
||||
payload.at("/key").asText(),
|
||||
payload.at("/message").asText(),
|
||||
if (!metadata.at("/status").asText().startsWith("2")) {
|
||||
// we only accept 200, 202, numbers of that nature
|
||||
log.error("exn-middleware-sal request failed with error code '{}' and message '{}'",
|
||||
metadata.at("/status"),
|
||||
salResponse.at("/message").asText(),
|
||||
keyValue("appId", appID));
|
||||
return mapper.missingNode();
|
||||
}
|
||||
return payload;
|
||||
return salResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -425,13 +439,34 @@ public class ExnConnector {
|
||||
* @return The cluster definition, or null in case of error.
|
||||
*/
|
||||
public JsonNode getCluster(String appID) {
|
||||
Map<String, Object> msg;
|
||||
msg = Map.of("metaData", Map.of("user", "admin", "clusterName", appID));
|
||||
Map<String, Object> response = deployCluster.sendSync(msg, appID, null, false);
|
||||
Map<String, Object> msg = Map.of("metaData", Map.of("user", "admin", "clusterName", appID));
|
||||
Map<String, Object> response = getCluster.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
return payload.isMissingNode() ? null : payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* Label the nodes with given names with the given labels.
|
||||
*
|
||||
* @param appID the application ID.
|
||||
* @param clusterID the cluster ID.
|
||||
* @param labels A map from node name to label.
|
||||
*/
|
||||
public boolean labelNodes(String appID, String clusterID, JsonNode labels) {
|
||||
Map<String, Object> msg;
|
||||
try {
|
||||
msg = Map.of("metaData", Map.of("user", "admin", "clusterName", appID),
|
||||
"body", mapper.writeValueAsString(labels));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Could not convert JSON to string (this should never happen)",
|
||||
keyValue("appId", appID), e);
|
||||
return false;
|
||||
}
|
||||
Map<String, Object> response = labelNodes.sendSync(msg, appID, null, false);
|
||||
JsonNode payload = extractPayloadFromExnResponse(response, appID);
|
||||
return payload.isMissingNode() ? false : true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deploy a cluster created by {@link #defineCluster}.
|
||||
*
|
||||
|
@ -31,6 +31,7 @@ import java.util.Spliterators;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.ow2.proactive.sal.model.NodeCandidate;
|
||||
import org.ow2.proactive.sal.model.Requirement;
|
||||
|
||||
/**
|
||||
@ -52,6 +53,13 @@ public class NebulousApp {
|
||||
*/
|
||||
@Getter private String name;
|
||||
|
||||
/**
|
||||
* The cluster name. This must be globally unique but should be short,
|
||||
* since during deployment it will be used to create instance names, where
|
||||
* AWS has a length restriction.
|
||||
*/
|
||||
@Getter private String clusterName;
|
||||
|
||||
// ----------------------------------------
|
||||
// App message parsing stuff
|
||||
|
||||
@ -99,7 +107,7 @@ public class NebulousApp {
|
||||
// Deployment stuff
|
||||
/** The original app message. */
|
||||
@Getter private JsonNode originalAppMessage;
|
||||
private ObjectNode original_kubevela;
|
||||
private ObjectNode originalKubevela;
|
||||
|
||||
/**
|
||||
* The current "generation" of deployment. Initial deployment sets this
|
||||
@ -110,7 +118,7 @@ public class NebulousApp {
|
||||
private int deployGeneration = 0;
|
||||
|
||||
/**
|
||||
* Map of component name to machine name(s) deployed for that component.
|
||||
* Map of component name to node name(s) deployed for that component.
|
||||
* Component names are defined in the KubeVela file. We assume that
|
||||
* component names stay constant during redeployment, i.e., once an
|
||||
* application is deployed, its KubeVela file will not change.
|
||||
@ -119,9 +127,26 @@ public class NebulousApp {
|
||||
* specified in KubeVela.
|
||||
*/
|
||||
@Getter
|
||||
private Map<String, Set<String>> componentMachineNames = new HashMap<>();
|
||||
private Map<String, Set<String>> componentNodeNames = new HashMap<>();
|
||||
/**
|
||||
* Map from node name to deployed edge or BYON node candidate. We keep
|
||||
* track of assigned edge candidates, since we do not want to
|
||||
* doubly-assign edge nodes. We also store the node name, so we can
|
||||
* "free" the edge candidate when the current component gets redeployed
|
||||
* and lets go of its edge node. (We do not track cloud node candidates
|
||||
* since these can be instantiated multiple times.)
|
||||
*/
|
||||
@Getter
|
||||
private Map<String, NodeCandidate> nodeEdgeCandidates = new HashMap<>();
|
||||
/** Map of component name to its requirements, as currently deployed.
|
||||
* Each replica of a component has identical requirements. */
|
||||
@Getter @Setter
|
||||
private Map<String, List<Requirement>> componentRequirements = new HashMap<>();
|
||||
/** Map of component name to its replica count, as currently deployed. */
|
||||
@Getter @Setter
|
||||
private Map<String, Integer> componentReplicaCounts = new HashMap<>();
|
||||
|
||||
/** When an app gets deployed or redeployed, this is where we send the AMPL file */
|
||||
/** When an app gets deployed, this is where we send the AMPL file */
|
||||
private Publisher ampl_message_channel;
|
||||
// /** Have we ever been deployed? I.e., when we rewrite KubeVela, are there
|
||||
// * already nodes running for us? */
|
||||
@ -158,8 +183,9 @@ public class NebulousApp {
|
||||
public NebulousApp(JsonNode app_message, ObjectNode kubevela, ExnConnector exnConnector) {
|
||||
this.UUID = app_message.at(uuid_path).textValue();
|
||||
this.name = app_message.at(name_path).textValue();
|
||||
this.clusterName = NebulousApps.calculateUniqueClusterName(this.UUID);
|
||||
this.originalAppMessage = app_message;
|
||||
this.original_kubevela = kubevela;
|
||||
this.originalKubevela = kubevela;
|
||||
this.exnConnector = exnConnector;
|
||||
JsonNode parameters = app_message.at(variables_path);
|
||||
if (parameters.isArray()) {
|
||||
@ -305,7 +331,7 @@ public class NebulousApp {
|
||||
* cannot be followed
|
||||
*/
|
||||
private JsonNode findPathInKubevela(String path) {
|
||||
JsonNode result = original_kubevela.at(path);
|
||||
JsonNode result = originalKubevela.at(path);
|
||||
return result.isMissingNode() ? null : result;
|
||||
}
|
||||
|
||||
@ -326,7 +352,7 @@ public class NebulousApp {
|
||||
* generated.
|
||||
*/
|
||||
public ObjectNode rewriteKubevelaWithSolution(ObjectNode variableValues) {
|
||||
ObjectNode freshKubevela = original_kubevela.deepCopy();
|
||||
ObjectNode freshKubevela = originalKubevela.deepCopy();
|
||||
for (Map.Entry<String, JsonNode> entry : variableValues.properties()) {
|
||||
String key = entry.getKey();
|
||||
JsonNode replacementValue = entry.getValue();
|
||||
@ -390,7 +416,7 @@ public class NebulousApp {
|
||||
JsonNode variable = function.withArray("/expression/variables").get(0);
|
||||
String variableName = variable.get("value").asText();
|
||||
JsonPointer path = kubevelaVariablePaths.get(variableName);
|
||||
JsonNode value = original_kubevela.at(path);
|
||||
JsonNode value = originalKubevela.at(path);
|
||||
ObjectNode constant = constants.withObject(function.get("name").asText());
|
||||
constant.put("Variable", variableName);
|
||||
constant.set("Value", value);
|
||||
@ -422,17 +448,19 @@ public class NebulousApp {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming solver message.
|
||||
* Handle an incoming solver message. If the message has a field {@code
|
||||
* deploySolution} with value {@code true}, rewrite the original KubeVela
|
||||
* file with the contained variable values and perform initial deployment
|
||||
* or redeployment as appropriate. Otherwise, ignore the message.
|
||||
*
|
||||
* @param solution The message from the solver, containing a field
|
||||
* "VariableValues" that can be processed by {@link
|
||||
* NebulousApp#rewriteKubevelaWithSolution}.
|
||||
*/
|
||||
public void processSolution(ObjectNode solution) {
|
||||
// TODO: check if the solution is for our application (check uuid) in
|
||||
// message; pass it in
|
||||
if (!solution.get("DeploySolution").asBoolean(false)) {
|
||||
// `asBoolean` returns its parameter if node cannot be converted to Boolean
|
||||
// `asBoolean` returns its argument if node is missing or cannot
|
||||
// be converted to Boolean
|
||||
return;
|
||||
}
|
||||
ObjectNode variables = solution.withObjectProperty("VariableValues");
|
||||
@ -456,10 +484,10 @@ public class NebulousApp {
|
||||
}
|
||||
|
||||
/**
|
||||
* Deploy an application, bypassing the solver. Will deploy unmodified
|
||||
* KubeVela, as given by the initial app creation message.
|
||||
* Deploy an application, bypassing the solver. This just deploys the
|
||||
* unmodified KubeVela, as given by the initial app creation message.
|
||||
*/
|
||||
public void deployUnmodifiedApplication() {
|
||||
NebulousAppDeployer.deployApplication(this, original_kubevela);
|
||||
NebulousAppDeployer.deployApplication(this, originalKubevela);
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,13 @@
|
||||
package eu.nebulouscloud.optimiser.controller;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer;
|
||||
import org.ow2.proactive.sal.model.AttributeRequirement;
|
||||
import org.ow2.proactive.sal.model.NodeCandidate;
|
||||
@ -57,8 +60,10 @@ public class NebulousAppDeployer {
|
||||
/**
|
||||
* Produce a fresh KubeVela specification with added node affinity traits.
|
||||
*
|
||||
* We add the following trait to all components, except those with
|
||||
* a replica count of 0:
|
||||
* During deployment and redeployment, we label all nodes with {@code
|
||||
* nebulouscloud.eu/<componentname>=true}. (Note that with this scheme, a
|
||||
* node can have labels for multiple components if desired.) We add the
|
||||
* following trait to all components:
|
||||
*
|
||||
* <pre>{@code
|
||||
* traits:
|
||||
@ -68,40 +73,45 @@ public class NebulousAppDeployer {
|
||||
* required:
|
||||
* nodeSelectorTerms:
|
||||
* - matchExpressions:
|
||||
* - key: "kubernetes.io/hostname"
|
||||
* - key: "nebulouscloud.eu/<componentname>"
|
||||
* operator: In
|
||||
* values: ["componentname-1", "componentname-2"]
|
||||
* values: "true"
|
||||
* }</pre>
|
||||
*
|
||||
* @param kubevela the KubeVela specification to modify. This parameter is
|
||||
* not modified.
|
||||
* @param componentMachineNames Map from component name to node names
|
||||
* where that component should be deployed.
|
||||
* @return a fresh KubeVela specification with added nodeAffinity traits.
|
||||
*/
|
||||
public static JsonNode addNodeAffinities(JsonNode kubevela, Map<String, Set<String>> componentMachineNames) {
|
||||
public static JsonNode addNodeAffinities(JsonNode kubevela) {
|
||||
JsonNode result = kubevela.deepCopy();
|
||||
for (final JsonNode c : result.withArray("/spec/components")) {
|
||||
if (componentMachineNames.getOrDefault(c.get("name").asText(), Set.of()).isEmpty()){
|
||||
// Do not generate trait at all if we didn't deploy any
|
||||
// machines. This happens if replicas is 0
|
||||
continue;
|
||||
}
|
||||
String name = c.get("name").asText();
|
||||
ArrayNode traits = c.withArray("traits");
|
||||
ObjectNode trait = traits.addObject();
|
||||
trait.put("type", "affinity");
|
||||
ArrayNode nodeSelectorTerms = trait.withArray("/properties/nodeAffinity/required/nodeSelectorTerms");
|
||||
ArrayNode matchExpressions = nodeSelectorTerms.addObject().withArray("matchExpressions");
|
||||
ObjectNode term = matchExpressions.addObject();
|
||||
term.put("key", "kubernetes.io/hostname")
|
||||
.put("operator", "In");
|
||||
componentMachineNames
|
||||
.getOrDefault(c.get("name").asText(), Set.of())
|
||||
.forEach(nodename -> term.withArray("values").add(nodename));
|
||||
term.put("key", "nebulouscloud.eu/" + name)
|
||||
.put("operator", "In")
|
||||
.withArray("values").add("true");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a globally-unique node name.
|
||||
*
|
||||
* @param clusterName the unique cluster name.
|
||||
* @param componentName the KubeVela component name.
|
||||
* @param deployGeneration 1 for initial deployment, increasing for each redeployment.
|
||||
* @param nodeNumber the replica number of the component to be deployed on the node.
|
||||
* @return a fresh node name.
|
||||
*/
|
||||
private static String createNodeName(String clusterName, String componentName, int deployGeneration, int nodeNumber) {
|
||||
return String.format("%s-%s-%s-%s", clusterName, componentName, deployGeneration, nodeNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a KubeVela file, extract node requirements, create the job, start
|
||||
* its nodes and submit KubeVela.
|
||||
@ -117,8 +127,9 @@ public class NebulousAppDeployer {
|
||||
*/
|
||||
public static void deployApplication(NebulousApp app, JsonNode kubevela) {
|
||||
String appUUID = app.getUUID();
|
||||
String clusterName = app.getClusterName();
|
||||
ExnConnector conn = app.getExnConnector();
|
||||
Set<NodeCandidate> chosenEdgeCandidates = new HashSet<>();
|
||||
Map<String, NodeCandidate> edgeCandidates = app.getNodeEdgeCandidates();
|
||||
log.info("Starting initial deployment for application", keyValue("appId", appUUID));
|
||||
|
||||
int deployGeneration = app.getDeployGeneration() + 1;
|
||||
@ -148,6 +159,7 @@ public class NebulousAppDeployer {
|
||||
Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements);
|
||||
Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts);
|
||||
Main.logFile("controller-requirements-" + appUUID + ".txt", controllerRequirements);
|
||||
|
||||
// ----------------------------------------
|
||||
// 2. Find node candidates
|
||||
|
||||
@ -177,13 +189,14 @@ public class NebulousAppDeployer {
|
||||
|
||||
// Controller node
|
||||
log.debug("Deciding on controller node candidate", keyValue("appId", appUUID));
|
||||
String masterNodeName = clusterName + "-masternode"; // safe because all component node names end with a number
|
||||
NodeCandidate masterNodeCandidate = null;
|
||||
if (controllerCandidates.size() > 0) {
|
||||
masterNodeCandidate = controllerCandidates.get(0);
|
||||
if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE)
|
||||
.contains(masterNodeCandidate.getNodeCandidateType())) {
|
||||
// Mark this candidate as already chosen
|
||||
chosenEdgeCandidates.add(masterNodeCandidate);
|
||||
edgeCandidates.put(masterNodeName, masterNodeCandidate);
|
||||
}
|
||||
} else {
|
||||
log.error("Empty node candidate list for controller, continuing without creating node",
|
||||
@ -192,49 +205,55 @@ public class NebulousAppDeployer {
|
||||
|
||||
// Component nodes
|
||||
log.debug("Collecting worker nodes for {}", appUUID, keyValue("appId", appUUID));
|
||||
Map<String, NodeCandidate> nodeNameToCandidate = new HashMap<>();
|
||||
ArrayNode nodeLabels = mapper.createArrayNode();
|
||||
Map<String, NodeCandidate> clusterNodes = new HashMap<>();;
|
||||
// Here we collect multiple things:
|
||||
// - The node names for each component, in the field
|
||||
// NebulousApp#componentNodeNames
|
||||
// - Each node name and its candidate (clusterNodes), for
|
||||
// ExnConnector.createCluster
|
||||
// - Each node name and its label (nodeLabels), for
|
||||
// ExnConnector.labelNodes
|
||||
for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) {
|
||||
// Here we collect two things: the flat list (hostname ->
|
||||
// candidate) to send to createCluster, and the per-component
|
||||
// hostname sets that we remember in the app object.
|
||||
String componentName = e.getKey();
|
||||
int numberOfNodes = nodeCounts.get(componentName);
|
||||
Set<String> nodeNames = new HashSet<>();
|
||||
for (int i = 1; i <= numberOfNodes; i++) {
|
||||
String nodeName = String.format("%s-%s-%s", componentName, deployGeneration, i);
|
||||
List<NodeCandidate> candidates = workerCandidates.get(componentName);
|
||||
|
||||
if (candidates.size() == 0) {
|
||||
log.error("Empty node candidate list for component ~s, continuing without creating node", componentName, keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
|
||||
List<NodeCandidate> candidates = workerCandidates.get(componentName);
|
||||
if (candidates.size() == 0) {
|
||||
log.error("Empty node candidate list for component ~s, continuing without creating node", componentName, keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
for (int nodeNumber = 1; nodeNumber <= numberOfNodes; nodeNumber++) {
|
||||
String nodeName = createNodeName(clusterName, componentName, deployGeneration, nodeNumber);
|
||||
NodeCandidate candidate = candidates.stream()
|
||||
.filter(each -> !chosenEdgeCandidates.contains(each))
|
||||
.filter(each -> !edgeCandidates.values().contains(each))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE).contains(candidate.getNodeCandidateType())) {
|
||||
// We could remove this candidate from `candidates` here,
|
||||
// to save skipping over already-assigned edge nodes for
|
||||
// the next replica of this component, but we don't want
|
||||
// to make assumptions on whether the candidate list can
|
||||
// be modified. Note that we have to keep track of all
|
||||
// assigned edge nodes in any case, since they might be
|
||||
// candidates in subsequent components.
|
||||
chosenEdgeCandidates.add(candidate);
|
||||
if (candidate == null) {
|
||||
log.error("No available node candidate for node {} of component {}", nodeNumber, componentName,
|
||||
keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
nodeNameToCandidate.put(nodeName, candidate);
|
||||
if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE).contains(candidate.getNodeCandidateType())) {
|
||||
edgeCandidates.put(nodeName, candidate);
|
||||
}
|
||||
clusterNodes.put(nodeName, candidate);
|
||||
nodeLabels.addObject().put(nodeName, "nebulouscloud.eu/" + componentName + "=true");
|
||||
nodeNames.add(nodeName);
|
||||
}
|
||||
app.getComponentMachineNames().put(componentName, nodeNames);
|
||||
app.getComponentNodeNames().put(componentName, nodeNames);
|
||||
}
|
||||
Main.logFile("nodenames-" + appUUID + ".txt", app.getComponentNodeNames());
|
||||
Main.logFile("worker-nodes-" + appUUID + ".txt", clusterNodes);
|
||||
try {
|
||||
Main.logFile("worker-labels-" + appUUID + ".txt", mapper.writeValueAsString(nodeLabels));
|
||||
} catch (JsonProcessingException e1) {
|
||||
// ignore; the labelNodes method will report the same error later
|
||||
}
|
||||
Main.logFile("nodenames-" + appUUID + ".txt", app.getComponentMachineNames());
|
||||
Main.logFile("worker-nodes-" + appUUID + ".txt", nodeNameToCandidate);
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 4. Create cluster
|
||||
|
||||
String masterNodeName = "masternode"; // safe because all component node names end with a number
|
||||
ObjectNode cluster = mapper.createObjectNode();
|
||||
cluster.put("name", appUUID)
|
||||
.put("master-node", masterNodeName);
|
||||
@ -245,21 +264,23 @@ public class NebulousAppDeployer {
|
||||
.put("nodeCandidateId", masterNodeCandidate.getId())
|
||||
.put("cloudId", masterNodeCandidate.getCloud().getId());
|
||||
}
|
||||
nodeNameToCandidate.forEach((name, candidate) -> {
|
||||
clusterNodes.forEach((name, candidate) -> {
|
||||
nodes.addObject()
|
||||
.put("nodeName", name)
|
||||
.put("nodeCandidateId", candidate.getId())
|
||||
.put("cloudId", candidate.getCloud().getId());
|
||||
});
|
||||
boolean defineClusterSuccess = conn.defineCluster(appUUID, masterNodeName, null);
|
||||
boolean defineClusterSuccess = conn.defineCluster(clusterName, masterNodeName, nodes);
|
||||
|
||||
boolean labelClusterSuccess = conn.labelNodes(appUUID, clusterName, nodeLabels);
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 5. Deploy cluster
|
||||
boolean deployClusterSuccess = conn.deployCluster(appUUID);
|
||||
boolean deployClusterSuccess = conn.deployCluster(clusterName);
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 6. Rewrite KubeVela
|
||||
JsonNode rewritten = addNodeAffinities(kubevela, app.getComponentMachineNames());
|
||||
JsonNode rewritten = addNodeAffinities(kubevela);
|
||||
String rewritten_kubevela = "---\n# Did not manage to create rewritten KubeVela";
|
||||
try {
|
||||
rewritten_kubevela = yamlMapper.writeValueAsString(rewritten);
|
||||
@ -276,7 +297,9 @@ public class NebulousAppDeployer {
|
||||
// ------------------------------------------------------------
|
||||
// 8. Update NebulousApp state
|
||||
|
||||
// TODO: store rewritten KubeVela in application object
|
||||
app.setComponentRequirements(workerRequirements);
|
||||
app.setComponentReplicaCounts(nodeCounts);
|
||||
app.setDeployedKubevela(rewritten);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -293,7 +316,9 @@ public class NebulousAppDeployer {
|
||||
*/
|
||||
public static void redeployApplication(NebulousApp app, ObjectNode kubevela) {
|
||||
String appUUID = app.getUUID();
|
||||
String clusterName = app.getClusterName();
|
||||
int deployGeneration = app.getDeployGeneration() + 1;
|
||||
ExnConnector conn = app.getExnConnector();
|
||||
app.setDeployGeneration(deployGeneration);
|
||||
|
||||
log.info("Starting redeployment generation {}", deployGeneration, keyValue("appId", appUUID));
|
||||
@ -305,22 +330,139 @@ public class NebulousAppDeployer {
|
||||
// nodes by comparing against previous deployment.
|
||||
// 3. Find node candidates for new nodes (from Step 3) according to
|
||||
// their requirements (from Step 1)
|
||||
// 5. Rewrite KubeVela with updated node affinities
|
||||
// 6. Call clusterScaleOut endpoint with list of added nodes
|
||||
// 4. Rewrite KubeVela with updated node affinities
|
||||
// 5. Call clusterScaleOut endpoint with list of added nodes
|
||||
// 6. Call labelNodes for added nodes
|
||||
// 7. Call deployApplication with rewritten KubeVela
|
||||
// 8. call clusterScaleIn endpoint with list of removed node names
|
||||
Main.logFile("kubevela-updated-from-solver-" + appUUID + ".yaml", kubevela);
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// 1. Extract node requirements
|
||||
Map<String, List<Requirement>> workerRequirements = KubevelaAnalyzer.getRequirements(kubevela);
|
||||
Map<String, Integer> nodeCounts = KubevelaAnalyzer.getNodeCount(kubevela);
|
||||
Map<String, List<Requirement>> componentRequirements = KubevelaAnalyzer.getRequirements(kubevela);
|
||||
Map<String, Integer> componentReplicaCounts = KubevelaAnalyzer.getNodeCount(kubevela);
|
||||
|
||||
|
||||
Map<String, List<Requirement>> oldComponentRequirements = app.getComponentRequirements();
|
||||
Map<String, Integer> oldComponentReplicaCounts = app.getComponentReplicaCounts();
|
||||
|
||||
Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements);
|
||||
Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts);
|
||||
|
||||
ArrayNode nodeLabels = mapper.createArrayNode();
|
||||
List<String> nodesToRemove = new ArrayList<>();
|
||||
ArrayNode nodesToAdd = mapper.createArrayNode();
|
||||
|
||||
Map<String, NodeCandidate> edgeCandidates = app.getNodeEdgeCandidates();
|
||||
|
||||
// We know that the component names are identical and that the maps
|
||||
// contain all keys, so it's safe to iterate through the keys of one
|
||||
// map and use it in all maps.
|
||||
for (String componentName : componentRequirements.keySet()) {
|
||||
// The variable `allMachineNames` shall, at the end of the loop
|
||||
// body, contain the machine names for this component.
|
||||
Set<String> allMachineNames;
|
||||
List<Requirement> oldR = oldComponentRequirements.get(componentName);
|
||||
List<Requirement> newR = componentRequirements.get(componentName);
|
||||
if (oldR.containsAll(newR) && newR.containsAll(oldR)) {
|
||||
// Requirements did not change
|
||||
int oldCount = oldComponentReplicaCounts.get(componentName);
|
||||
int newCount = componentReplicaCounts.get(componentName);
|
||||
if (newCount > oldCount) {
|
||||
int nAdd = newCount - oldCount;
|
||||
allMachineNames = app.getComponentNodeNames().get(componentName);
|
||||
log.debug("Adding {} nodes to component {}", nAdd, componentName,
|
||||
keyValue("appId", appUUID));
|
||||
List<NodeCandidate> candidates = conn.findNodeCandidates(newR, appUUID);
|
||||
if (candidates.isEmpty()) {
|
||||
log.error("Could not find node candidates for requirements: {}",
|
||||
newR, keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
for (int nodeNumber = 1; nodeNumber <= nAdd; nodeNumber++) {
|
||||
String nodeName = createNodeName(clusterName, componentName, deployGeneration, nodeNumber);
|
||||
NodeCandidate candidate = candidates.stream()
|
||||
.filter(each -> !edgeCandidates.values().contains(each))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
if (candidate == null) {
|
||||
log.error("No available node candidate for node {} of component {}", nodeNumber, componentName,
|
||||
keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE).contains(candidate.getNodeCandidateType())) {
|
||||
edgeCandidates.put(nodeName, candidate);
|
||||
}
|
||||
nodesToAdd.addObject()
|
||||
.put("nodeName", nodeName)
|
||||
.put("nodeCandidateId", candidate.getId())
|
||||
.put("cloudId", candidate.getCloud().getId());
|
||||
nodeLabels.addObject()
|
||||
.put(nodeName, "nebulouscloud.eu/" + componentName + "=true");
|
||||
allMachineNames.add(nodeName);
|
||||
}
|
||||
} else if (newCount < oldCount) {
|
||||
// We could be smarter and compute all scaleIn operations
|
||||
// first, which would potentially free edge nodes that we
|
||||
// could then reassign during subsequent scaleOut.
|
||||
// Something for version 2.
|
||||
int nRemove = oldCount - newCount;
|
||||
log.debug("Removing {} nodes from component {}", nRemove, componentName,
|
||||
keyValue("appId", appUUID));
|
||||
// We could be a bit smarter here: remove cloud instances
|
||||
// first and keep edge nodes in use, on the assumption
|
||||
// that it's better to keep using edge nodes since cloud
|
||||
// nodes incur a cost.
|
||||
allMachineNames = app.getComponentNodeNames().get(componentName);
|
||||
Set<String> removedInstances = allMachineNames.stream().limit(nRemove).collect(Collectors.toSet());
|
||||
removedInstances.forEach(edgeCandidates::remove);
|
||||
allMachineNames.removeAll(removedInstances);
|
||||
nodesToRemove.addAll(removedInstances);
|
||||
} else {
|
||||
log.debug("Nothing changed for component {}", componentName, keyValue("appId", appUUID));
|
||||
allMachineNames = app.getComponentNodeNames().get(componentName);
|
||||
}
|
||||
} else {
|
||||
nodesToRemove.addAll(app.getComponentNodeNames().get(componentName));
|
||||
allMachineNames = new HashSet<>();
|
||||
log.debug("Redeploying all nodes of component {}", componentName,
|
||||
keyValue("appId", appUUID));
|
||||
List<NodeCandidate> candidates = conn.findNodeCandidates(newR, appUUID);
|
||||
if (candidates.size() == 0) {
|
||||
log.error("Empty node candidate list for component ~s, continuing without creating node", componentName,
|
||||
keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
for (int nodeNumber = 1; nodeNumber <= componentReplicaCounts.get(componentName); nodeNumber++) {
|
||||
String nodeName = createNodeName(clusterName, componentName, deployGeneration, nodeNumber);
|
||||
NodeCandidate candidate = candidates.stream()
|
||||
.filter(each -> !edgeCandidates.values().contains(each))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
if (candidate == null) {
|
||||
log.error("No available node candidate for node {} of component {}", nodeNumber, componentName,
|
||||
keyValue("appId", appUUID));
|
||||
continue;
|
||||
}
|
||||
if (Set.of(NodeCandidateTypeEnum.BYON, NodeCandidateTypeEnum.EDGE).contains(candidate.getNodeCandidateType())) {
|
||||
edgeCandidates.put(nodeName, candidate);
|
||||
}
|
||||
nodesToAdd.addObject()
|
||||
.put("nodeName", nodeName)
|
||||
.put("nodeCandidateId", candidate.getId())
|
||||
.put("cloudId", candidate.getCloud().getId());
|
||||
allMachineNames.add(nodeName);
|
||||
}
|
||||
}
|
||||
app.getComponentNodeNames().put(componentName, allMachineNames);
|
||||
}
|
||||
|
||||
Main.logFile("worker-requirements-" + appUUID + ".txt", componentRequirements);
|
||||
Main.logFile("worker-counts-" + appUUID + ".txt", componentReplicaCounts);
|
||||
|
||||
// Call `scaleOut` with nodesToAdd
|
||||
|
||||
// Call `labelNodes` with nodeLabels
|
||||
|
||||
// Call `deployApplication`
|
||||
|
||||
// Call `scaleIn` with nodesToRemove
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -63,4 +63,18 @@ public class NebulousApps {
|
||||
public static synchronized Collection<NebulousApp> values() {
|
||||
return apps.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate a short, unique cluster name from the given application id.
|
||||
* Currently, we use the first 5 characters of the application id followed
|
||||
* by the current number of registered applications. We deem the risk of
|
||||
* two applications with identical UUID heads racing to register to be
|
||||
* acceptable.
|
||||
*
|
||||
* @param applicationUuid the ID of an application that is not yet registered.
|
||||
* @return a short string that is unique across all registered applications.
|
||||
*/
|
||||
public static synchronized String calculateUniqueClusterName(String applicationUuid) {
|
||||
return applicationUuid.substring(0, 5) + "-" + (apps.size() + 1);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user