Start to use exn middleware to talk with SAL

NOTE: NebulousAppDeployer.deployApplication is only half implemented,
awaiting new SAL endpoints.

Change-Id: I10cbb7770a59eed963579804fd66428ab6771cc4
This commit is contained in:
Rudi Schlatte 2024-02-09 10:18:46 +02:00
parent 4e4d8e11e8
commit a23692efa0
11 changed files with 170 additions and 502 deletions

View File

@ -46,15 +46,6 @@ spec:
value: "{{ .Values.activemq.ACTIVEMQ_PORT }}"
- name: ACTIVEMQ_USER
value: "{{ .Values.activemq.ACTIVEMQ_USER }}"
- name: SAL_USER
value: "{{ .Values.sal.SAL_USER }}"
- name: SAL_URL
value: "{{ .Values.sal.SAL_URL }}"
- name: SAL_PASSWORD
valueFrom:
secretKeyRef:
name: {{ include "nebulous-optimiser-controller.fullname" . }}-secrets
key: SAL_PASSWORD
- name: ACTIVEMQ_PASSWORD
valueFrom:
secretKeyRef:

View File

@ -7,4 +7,3 @@ metadata:
type: Opaque
data:
ACTIVEMQ_PASSWORD: {{ .Values.secrets.ACTIVEMQ_PASSWORD | b64enc | quote }}
SAL_PASSWORD: {{ .Values.secrets.SAL_PASSWORD | b64enc | quote }}

View File

@ -84,10 +84,6 @@ affinity: {}
debug:
LOGDIR: /tmp/nebulous
sal:
SAL_URL: sal
SAL_USER: admin
activemq:
ACTIVEMQ_HOST: activemq
ACTIVEMQ_PORT: 5672
@ -95,4 +91,3 @@ activemq:
secrets:
ACTIVEMQ_PASSWORD: nebulous
SAL_PASSWORD: admin

View File

@ -48,14 +48,6 @@ dependencies {
// SAL client library
implementation 'org.ow2.proactive:sal-common:13.1.0-SNAPSHOT'
// HTTP requests; used by Melodic and we adapt their SAL client code
// https://github.com/reactor/reactor-netty
implementation 'io.projectreactor.netty:reactor-netty:1.1.15'
// HTTPHeaders etc.; used by Melodic and we adapt their SAL client code
// https://mvnrepository.com/artifact/org.springframework/spring-web
implementation 'org.springframework:spring-web:6.1.3'
// Logging: https://www.slf4j.org
implementation 'org.slf4j:slf4j-api:2.0.9'
// We use java.util.logging as the backend for now; see

View File

@ -5,17 +5,24 @@ import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.exn.core.SyncedPublisher;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.protonj2.client.Message;
import org.ow2.proactive.sal.model.NodeCandidate;
import org.ow2.proactive.sal.model.Requirement;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -53,6 +60,30 @@ public class ExnConnector {
@Getter
private final Publisher amplMessagePublisher;
// ----------------------------------------
// Communication with SAL
// We define these publishers here instead of in the `SalConnector`
// class since they need to be registered here and I'm afraid I will
// forget to do it when adding new endpoints over in another class.
/** The createJob endpoint. */
public static final SyncedPublisher createJob
= new SyncedPublisher("createJob",
"eu.nebulouscloud.exn.sal.job.post", true, true);
/** The findNodeCandidates endpoint. */
public static final SyncedPublisher findNodeCandidates
= new SyncedPublisher("findNodeCandidates",
"eu.nebulouscloud.exn.sal.nodecandidate.get", true, true);
/** The addNodes endpoint. */
public static final SyncedPublisher addNodes
= new SyncedPublisher("addNodes",
"eu.nebulouscloud.exn.sal.nodes.add", true, true);
/** The submitJob endpoint. */
public static final SyncedPublisher submitJob
= new SyncedPublisher("submitJob",
"eu.nebulouscloud.exn.sal.job.update", true, true);
/**
* Create a connection to ActiveMQ via the exn middleware, and set up the
* initial publishers and consumers.
@ -71,7 +102,8 @@ public class ExnConnector {
conn = new Connector("optimiser_controller",
callback,
// List.of(new Publisher("config", "config", true)),
List.of(amplMessagePublisher),
List.of(amplMessagePublisher,
createJob, findNodeCandidates, addNodes, submitJob),
List.of(new Consumer("ui_app_messages", app_creation_channel, new AppCreationMessageHandler(), true, true)),
true,
true,
@ -125,7 +157,9 @@ public class ExnConnector {
log.info("App creation message received for app {}", app_id);
JsonNode appMessage = mapper.valueToTree(body);
Main.logFile("app-message-" + app_id + ".json", appMessage);
NebulousApp app = NebulousApp.newFromAppMessage(mapper.valueToTree(body), amplMessagePublisher);
NebulousApp app = NebulousApp.newFromAppMessage(
// TODO create a new ExnConnector here?
mapper.valueToTree(body), ExnConnector.this);
NebulousApps.add(app);
app.sendAMPL();
app.deployUnmodifiedApplication();
@ -161,4 +195,5 @@ public class ExnConnector {
}
}
}
}

View File

@ -34,10 +34,8 @@ public class LocalExecution implements Callable<Integer> {
@Override public Integer call() {
ObjectMapper mapper = new ObjectMapper();
CountDownLatch exn_synchronizer = new CountDownLatch(1);
ExnConnector connector = main.getActiveMQConnector();
Publisher publisher = null;
ExnConnector connector = Main.getActiveMQConnector();
if (connector != null) {
publisher = connector.getAmplMessagePublisher();
connector.start(exn_synchronizer);
}
JsonNode msg;
@ -47,9 +45,9 @@ public class LocalExecution implements Callable<Integer> {
log.error("Could not read an input file: ", e);
return 1;
}
NebulousApp app = NebulousApp.newFromAppMessage(msg, publisher);
NebulousApp app = NebulousApp.newFromAppMessage(msg, connector);
if (connector != null) {
log.debug("Sending AMPL to channel {}", publisher);
log.debug("Sending AMPL to channel {}", connector.getAmplMessagePublisher());
app.sendAMPL();
app.deployUnmodifiedApplication();
}

View File

@ -39,24 +39,6 @@ import static picocli.CommandLine.Option;
)
public class Main implements Callable<Integer> {
@Option(names = {"-s", "--sal-url"},
description = "The URL of the SAL server (including URL scheme http:// or https://). Can also be set via the @|bold SAL_URL|@ environment variable.",
paramLabel = "SAL_URL",
defaultValue = "${SAL_URL:-http://localhost:8880/}")
private java.net.URI sal_uri;
@Option(names = {"--sal-user"},
description = "The user name for the SAL server. Can also be set via the @|bold SAL_USER|@ environment variable.",
paramLabel = "SAL_USER",
defaultValue = "${SAL_USER}")
private String sal_user;
@Option(names = {"--sal-password"},
description = "The password for the SAL server. Can also be set via the @|bold SAL_PASSWORD|@ environment variable.",
paramLabel = "SAL_PASSWORD",
defaultValue = "${SAL_PASSWORD}")
private String sal_password;
@Option(names = {"--activemq-host"},
description = "The hostname of the ActiveMQ server. Can also be set via the @|bold ACTIVEMQ_HOST|@ environment variable.",
paramLabel = "ACTIVEMQ_HOST",
@ -93,20 +75,13 @@ public class Main implements Callable<Integer> {
scope = ScopeType.INHERIT)
private boolean[] verbosity;
/**
* The connector to the SAL library.
*
* @return the SAL connector, or null if running offline.
*/
@Getter
private SalConnector salConnector = null;
/**
* The ActiveMQ connector.
*
* @return the ActiveMQ connector wrapper, or null if running offline.
*/
@Getter
private ExnConnector activeMQConnector = null;
private static ExnConnector activeMQConnector = null;
/**
* PicoCLI execution strategy that uses common initialization.
@ -157,18 +132,6 @@ public class Main implements Callable<Integer> {
log.info("Logging all messages to directory {}", logDirectory);
}
}
// Start connection to SAL if possible.
if (sal_uri != null && sal_user != null && sal_password != null) {
salConnector = new SalConnector(sal_uri, sal_user, sal_password);
if (!salConnector.isConnected()) {
log.warn("Connection to SAL unsuccessful, continuing without SAL");
} else {
log.info("Established connection to SAL");
NebulousApp.setSalConnector(salConnector);
}
} else {
log.debug("SAL login information not specified, skipping");
}
// Start connection to ActiveMQ if possible.
if (activemq_user != null && activemq_password != null) {
log.info("Preparing ActiveMQ connection: host={} port={}",

View File

@ -11,7 +11,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import eu.nebulouscloud.exn.core.Publisher;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@ -95,17 +94,6 @@ public class NebulousApp {
/** The original app message. */
@Getter private JsonNode originalAppMessage;
private ObjectNode original_kubevela;
/**
* The active SAL connector, or null if we operate offline.
*
* NOTE: this might only be used until we switch to the exn-sal
* middleware, or maybe we keep the SalConnector class and send to exn
* from there.
*
* @param salConnector the SAL connector.
*/
@Setter @Getter
private static SalConnector salConnector;
/**
* Map of component name to machine name(s) deployed for that component.
@ -125,6 +113,14 @@ public class NebulousApp {
* already nodes running for us? */
private boolean deployed = false;
/**
* The EXN connector for this class. At the moment all apps share the
* same instance, but probably every app should have their own, out of
* thread-safety concerns.
*/
@Getter
private ExnConnector exnConnector;
/**
* Creates a NebulousApp object.
*
@ -134,11 +130,12 @@ public class NebulousApp {
*/
// Note that example KubeVela and parameter files can be found at
// optimiser-controller/src/test/resources/
public NebulousApp(JsonNode app_message, ObjectNode kubevela, Publisher ampl_message_channel) {
public NebulousApp(JsonNode app_message, ObjectNode kubevela, ExnConnector exnConnector) {
this.UUID = app_message.at(uuid_path).textValue();
this.name = app_message.at(name_path).textValue();
this.originalAppMessage = app_message;
this.original_kubevela = kubevela;
this.exnConnector = exnConnector;
JsonNode parameters = app_message.at(variables_path);
if (parameters.isArray()) {
this.kubevelaVariables = (ArrayNode)app_message.at(variables_path);
@ -146,7 +143,6 @@ public class NebulousApp {
log.error("Cannot read parameters from app message '{}', continuing without parameters", UUID);
this.kubevelaVariables = mapper.createArrayNode();
}
this.ampl_message_channel = ampl_message_channel;
for (final JsonNode p : kubevelaVariables) {
kubevela_variable_paths.put(p.get("key").asText(),
JsonPointer.compile(p.get("path").asText()));
@ -212,11 +208,14 @@ public class NebulousApp {
/**
* Create a NebulousApp object given an app creation message parsed into JSON.
*
* @param app_message the app creation message, including valid KubeVela YAML et al
* @param ampl_message_channel conduit to broadcast the current AMPL file
* @return a NebulousApp object, or null if `app_message` could not be parsed
* @param app_message the app creation message, including valid KubeVela
* YAML et al
* @param exnConnector The EXN connector to use for sending messages to
* the solver etc.
* @return a NebulousApp object, or null if `app_message` could not be
* parsed
*/
public static NebulousApp newFromAppMessage(JsonNode app_message, Publisher ampl_message_channel) {
public static NebulousApp newFromAppMessage(JsonNode app_message, ExnConnector exnConnector) {
try {
String kubevela_string = app_message.at(kubevela_path).textValue();
JsonNode parameters = app_message.at(variables_path);
@ -226,8 +225,7 @@ public class NebulousApp {
} else {
Main.logFile("incoming-kubevela-" + app_message.at(uuid_path).textValue() + ".yaml", kubevela_string);
return new NebulousApp(app_message,
(ObjectNode)readKubevelaString(kubevela_string),
ampl_message_channel);
(ObjectNode)readKubevelaString(kubevela_string), exnConnector);
}
} catch (Exception e) {
log.error("Could not read app creation message: ", e);
@ -348,10 +346,6 @@ public class NebulousApp {
* Calculate AMPL file and send it off to the solver.
*/
public void sendAMPL() {
if (ampl_message_channel == null) {
log.warn("AMPL publisher not set, cannot send AMPL file");
return;
}
String ampl = AMPLGenerator.generateAMPL(this);
ObjectNode msg = mapper.createObjectNode();
msg.put("FileName", getUUID() + ".ampl");
@ -381,7 +375,7 @@ public class NebulousApp {
constant.set("Value", value);
}
ampl_message_channel.send(mapper.convertValue(msg, Map.class), getUUID(), true);
exnConnector.getAmplMessagePublisher().send(mapper.convertValue(msg, Map.class), getUUID(), true);
Main.logFile("to-solver-" + getUUID() + ".json", msg.toString());
Main.logFile("to-solver-" + getUUID() + ".ampl", ampl);
}

View File

@ -141,7 +141,7 @@ public class NebulousAppDeployer {
* requirements for that component. No requirements mean any node will
* suffice.
*/
public static Map<String, List<Requirement>> getSalRequirementsFromKubevela(JsonNode kubevela) {
public static Map<String, List<Requirement>> getWorkerRequirementsFromKubevela(JsonNode kubevela) {
Map<String, List<Requirement>> result = new HashMap<>();
ArrayNode components = kubevela.withArray("/spec/components");
for (final JsonNode c : components) {
@ -194,7 +194,8 @@ public class NebulousAppDeployer {
}
}
for (final JsonNode t : c.withArray("/traits")) {
// TODO: Check for node affinity / geoLocation / country
// TODO: Check for node affinity / geoLocation / country /
// node type (edge or cloud)
}
// Finally, add requirements for this job to the map
result.put(componentName, reqs);
@ -252,139 +253,103 @@ public class NebulousAppDeployer {
* Given a KubeVela file, extract node requirements, create the job, start
* its nodes and submit KubeVela.
*
* NOTE: this method is under reconstruction, pending the new endpoints.
*
* @param app the NebulOuS app object.
* @param kubevela the KubeVela file to deploy.
*/
public static void deployApplication(NebulousApp app, JsonNode kubevela) {
String appUUID = app.getUUID();
String appName = app.getName();
log.info("Starting initial deployment of {}", appUUID);
if (NebulousApp.getSalConnector() == null) {
log.warn("Tried to submit job, but do not have a connection to SAL");
return;
}
// The overall flow:
//
// 1. Extract node requirements and node counts from the KubeVela
// definition.
// 2. Create a SAL job, with the uuid and name of the NebulOuS app
// 3. Create a coordinator node with hardcoded requirements; this node
// will run the Kubernetes controller. This node is in addition to
// the nodes required by KubeVela.
// 4. Submit the job, thereby starting the coordinator node
// 5. Extract information (IP address, ...) from the coordinator node
// 6. Add the worker nodes to the job
// 7. Rewrite the KubeVela file to add node affinities to each
// component
// 8. Send the KubeVela file to the coordinator node
// 2. Find node candidates for all workers and the controller.
// 3. Select node candidates.
// 4. Create a SAL cluster.
// 5. Deploy the SAL cluster.
// ------------------------------------------------------------
// 1. Extract node requirements
Map<String, List<Requirement>> requirements = getSalRequirementsFromKubevela(kubevela);
Map<String, List<Requirement>> workerRequirements = getWorkerRequirementsFromKubevela(kubevela);
Map<String, Integer> nodeCounts = getNodeCountFromKubevela(kubevela);
List<Requirement> controllerRequirements = getControllerRequirements(appUUID);
Main.logFile("node-requirements-" + appUUID + ".txt", requirements);
Main.logFile("node-counts-" + appUUID + ".txt", nodeCounts);
Main.logFile("worker-requirements-" + appUUID + ".txt", workerRequirements);
Main.logFile("worker-counts-" + appUUID + ".txt", nodeCounts);
Main.logFile("contoller-requirements-" + appUUID + ".txt", controllerRequirements);
// ----------------------------------------
// 2. Find node candidates
// ------------------------------------------------------------
// 2. Create SAL job
log.debug("Creating job info for {}", appUUID);
JobInformation jobinfo = new JobInformation(appUUID, appName);
// TODO: figure out what ports to specify here
List<Communication> communications = List.of();
// This task is deployed on the controller node (the one not specified
// in the app KubeVela file)
TaskDefinition nebulous_controller_task = new TaskDefinition(
"nebulous-controller", controllerInstallation, List.of());
// This task is deployed on all worker nodes (the ones specified by
// the app KubeVela file and optimized by NebulOuS)
// TODO: find out if/how to modify `nebulous_worker_task` to pass in
// information about the controller
TaskDefinition nebulous_worker_task = new TaskDefinition(
"nebulous-worker", nodeInstallation, List.of());
List<TaskDefinition> tasks = List.of(nebulous_controller_task, nebulous_worker_task);
JobDefinition job = new JobDefinition(communications, jobinfo, tasks);
Boolean success = NebulousApp.getSalConnector().createJob(job);
if (!success) {
// This can happen if the job has already been submitted
log.error("Error trying to create the job; SAL createJob returned {}", success);
log.debug("Check if a job with id {} already exists, run stopJobs if yes", appUUID);
return;
// TODO: switch to asking the cloud broker for candidates when it's
// ready
List<NodeCandidate> controllerCandidates = SalConnector.findNodeCandidates(controllerRequirements, appUUID);
if (controllerCandidates.isEmpty()) {
log.error("Could not find node candidates for requirements: {}", controllerRequirements);
// Continue here while we don't really deploy
// return;
}
Map<String, List<NodeCandidate>> workerCandidates = new HashMap<>();
for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) {
String nodeName = e.getKey();
List<Requirement> requirements = e.getValue();
List<NodeCandidate> candidates = SalConnector.findNodeCandidates(requirements, appUUID);
if (candidates.isEmpty()) {
log.error("Could not find node candidates for requirements: {}", requirements);
// Continue here while we don't really deploy
// return;
}
workerCandidates.put(nodeName, candidates);
}
// ------------------------------------------------------------
// 3. Create coordinator node
log.debug("Creating app coordinator node for {}", appUUID);
List<NodeCandidate> controller_candidates
= NebulousApp.getSalConnector().findNodeCandidates(getControllerRequirements(appUUID));
if (controller_candidates.isEmpty()) {
log.error("Could not find node candidates for controller node; requirements: {}",
getControllerRequirements(appUUID));
return;
}
NodeCandidate controller_candidate = controller_candidates.get(0);
// 3. Select node candidates
IaasDefinition controller_def = new IaasDefinition(
"nebulous-controller-node", "nebulous-controller",
controller_candidate.getId(), controller_candidate.getCloud().getId());
success = NebulousApp.getSalConnector().addNodes(List.of(controller_def), appUUID);
if (!success) {
log.error("Failed to add controller node: {}", controller_candidate);
return;
}
// ------------------------------------------------------------
// 4. Submit job
log.debug("Starting job {}", appUUID);
String return_job_id = NebulousApp.getSalConnector().submitJob(appUUID);
if (return_job_id.equals("-1")) {
log.error("Failed to add start job {}, SAL returned {}",
appUUID, return_job_id);
return;
}
// ------------------------------------------------------------
// 5. Extract coordinator node information
// TODO
// ------------------------------------------------------------
// 6. Create worker nodes from requirements
log.debug("Starting worker nodes for {}", appUUID);
for (Map.Entry<String, List<Requirement>> e : requirements.entrySet()) {
log.debug("Collecting worker nodes for {}", appUUID);
Map<String, NodeCandidate> nodeNameToCandidate = new HashMap<>();
for (Map.Entry<String, List<Requirement>> e : workerRequirements.entrySet()) {
// Here we collect two things: the flat list (hostname ->
// candidate) to send to createCluster, and the per-component
// hostname sets that we remember in the app object.
String componentName = e.getKey();
int numberOfNodes = nodeCounts.get(e.getKey());
int numberOfNodes = nodeCounts.get(componentName);
Set<String> nodeNames = new HashSet<>();
for (int i = 1; i <= numberOfNodes; i++){
nodeNames.add(String.format("%s-%s", componentName, i));
for (int i = 1; i <= numberOfNodes; i++) {
String nodeName = String.format("%s-%s", componentName, i);
nodeNames.add(nodeName);
// TODO: Here we need to discriminate between edge and cloud
// node candidates: we can deploy an edge node only once, but
// cloud nodes arbitrarily often. So if the best node
// candidate is an edge node, we should select it and fill the
// rest of the nodes with second-best cloud nodes.
// TODO: make sure we only choose the same edge node once; it
// might be in all node candidate lists :)
if (!workerCandidates.get(componentName).isEmpty()) {
// should always be true, except currently we don't abort
// in Step 2 if we don't find candidates.
NodeCandidate candidate = workerCandidates.get(componentName).get(0);
nodeNameToCandidate.put(nodeName, candidate);
}
}
app.getComponentMachineNames().put(componentName, nodeNames);
if (numberOfNodes == 0) {
// Do not ask for node candidates if this component's replica
// count is 0. Note that we still set the app's machine names
// to an empty set.
continue;
}
List<NodeCandidate> candidates = NebulousApp.getSalConnector().findNodeCandidates(e.getValue());
if (candidates.isEmpty()) {
log.error("Could not find node candidates for requirements: {}", e.getValue());
return;
}
NodeCandidate candidate = candidates.get(0);
List<IaasDefinition> componentDefinitions = nodeNames.stream()
.map(name -> new IaasDefinition(name, "nebulous-worker", candidate.getId(), candidate.getCloud().getId()))
.collect(Collectors.toList());
log.debug("Asking for {} copies of {} for application {}", numberOfNodes, candidate, appUUID);
success = NebulousApp.getSalConnector().addNodes(componentDefinitions, appUUID);
if (!success) {
log.error("Failed to add nodes for component {}", componentName);
}
}
Main.logFile("nodenames-" + appUUID + ".txt", app.getComponentMachineNames());
// ------------------------------------------------------------
// 7. Rewrite KubeVela file, based on running node names
Main.logFile("worker-nodes-" + appUUID + ".txt", nodeNameToCandidate);
// ------------------------------------------------------------
// 4. Create cluster
// TODO: call defineCluster endpoint with nodename -> candidate
// mapping etc.
// ------------------------------------------------------------
// 5. Deploy cluster
// TODO: call deployCluster endpoint
// TODO
JsonNode rewritten = addNodeAffinities(kubevela, app.getComponentMachineNames());
String rewritten_kubevela = "---\n# Did not manage to create rewritten KubeVela";
try {
@ -393,11 +358,7 @@ public class NebulousAppDeployer {
log.error("Failed to convert KubeVela to YAML; this should never happen", e);
}
Main.logFile("rewritten-kubevela-" + appUUID + ".yaml", rewritten_kubevela);
// ------------------------------------------------------------
// 8. Submit KubeVela file to coordinator node
// TODO
// TODO: call deployApplication endpoint
}
/**
@ -405,7 +366,7 @@ public class NebulousAppDeployer {
*/
public static void redeployApplication(NebulousApp app, ObjectNode kubevela) {
// The overall flow:
//
//
// 1. Extract node requirements and node counts from the updated
// KubeVela definition.
// 2. Extract current nodes from running SAL job
@ -413,10 +374,10 @@ public class NebulousAppDeployer {
// nodes
// 4. Find node candidates for new nodes (from Step 3) according to
// their requirements (from Step 1)
// 5. Create nodes, add them to SAL job
// 6. Rewrite KubeVela with updated node affinities
// 7. Send updated KubeVela to running cluster
// 8. Shut down superfluous nodes (from Step 3)
// 5. Rewrite KubeVela with updated node affinities
// 6. Call clusterScaleOut endpoint with list of added nodes
// 7. Call deployApplication with rewritten KubeVela
// 8. call clusterScaleIn endpoint with list of removed node names
}

View File

@ -1,310 +1,50 @@
package eu.nebulouscloud.optimiser.controller;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ow2.proactive.sal.model.NodeCandidate;
import org.ow2.proactive.sal.model.Requirement;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.logging.LogLevel;
import lombok.extern.slf4j.Slf4j;
import org.ow2.proactive.sal.model.IaasDefinition;
import org.ow2.proactive.sal.model.Job;
import org.ow2.proactive.sal.model.JobDefinition;
import org.ow2.proactive.sal.model.NodeCandidate;
import org.ow2.proactive.sal.model.PACloud;
import org.ow2.proactive.sal.model.Requirement;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufMono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* A class that wraps communication with SAL (the Scheduling Abstraction Layer
* of ProActive) over REST.
* of ProActive) over EXN.
*
* Documentation of the SAL REST API is here:
* https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1
* https://github.com/ow2-proactive/scheduling-abstraction-layer/tree/master/documentation
*/
@Slf4j
public class SalConnector {
private static final String connectStr = "sal/pagateway/connect";
private static final String getAllCloudsStr = "sal/clouds";
private static final String findNodeCandidatesStr = "sal/nodecandidates";
private static final String createJobStr = "sal/job";
private static final String getJobsStr = "sal/job"; // same, but different method/body
private static final String addNodesFormatStr = "sal/node/%s";
private static final String submitJobFormatStr = "sal/job/%s/submit";
private static final String stopJobsStr = "sal/job/stop";
private URI sal_uri;
private final HttpClient httpClient;
private String session_id = null;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Construct a SalConnector instance.
*
* @param sal_uri the URI of the SAL server. Should only contain schema,
* host, port but no path component, since relative paths will be
* resolved against this URI.
* @param login the login name for SAL.
* @param password the login password for SAL.
*/
public SalConnector(URI sal_uri, String login, String password) {
this.sal_uri = sal_uri;
// This initialization code copied from
// https://gitlab.ow2.org/melodic/melodic-integration/-/blob/morphemic-rc4.0/connectors/proactive_client/src/main/java/cloud/morphemic/connectors/ProactiveClientConnectorService.java
objectMapper.configOverride(List.class)
.setSetterInfo(JsonSetter.Value.forValueNulls(Nulls.AS_EMPTY))
.setSetterInfo(JsonSetter.Value.forContentNulls(Nulls.AS_EMPTY));
connect(login, password);
httpClient = HttpClient.create()
.baseUrl(sal_uri.toString())
.headers(headers -> headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE))
.responseTimeout(Duration.of(80, ChronoUnit.SECONDS))
.wiretap("reactor.netty.http.client.HttpClient", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL, StandardCharsets.UTF_8);
if (isConnected()) {
httpClient.headers(headers -> headers.add("sessionid", session_id));
}
httpClient.warmup().block();
}
/**
* Check if we are connected to a SAL endpoint, i.e., we successfully
* obtained a session id.
*
* @return true if we are connected, false if not
*/
public boolean isConnected() {
return session_id != null;
}
/**
* Establish a connection with the SAL server.
*
* This method needs to be called before any other method, since it
* obtains the session id.
*
* @param sal_username the user name to log in to SAL
* @param sal_password the password to log in to SAL
* @return true if the connection was successful, false if not
*/
private boolean connect(String sal_username, String sal_password) {
URI endpoint_uri = sal_uri.resolve(connectStr);
log.trace("Connecting to SAL as a service at uri {}", endpoint_uri);
private static final ObjectMapper mapper = new ObjectMapper();
public static List<NodeCandidate> findNodeCandidates(List<Requirement> requirements, String appID) {
Map<String, Object> msg = new HashMap<>();
Map<String, Object> metadata = new HashMap<>();
metadata.put("user", "admin");
msg.put("metaData", metadata);
try {
this.session_id = HttpClient.create()
.headers(headers -> headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
.post()
.uri(endpoint_uri)
.sendForm((req, form) -> form
.attr("username", sal_username)
.attr("password", sal_password))
.responseContent()
.aggregate()
.asString()
.retry(20)
.block();
} catch (Exception e) {
log.error("Error while connecting to SAL", e);
return false;
}
log.debug("Connected to SAL, sessionid {}...", session_id.substring(0, 10));
return true;
}
/**
* Get all cloud providers. See
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/2-cloud-endpoints.md#22--getallclouds-endpoint
*/
public List<PACloud> getAllClouds() {
return httpClient.get()
.uri(sal_uri.resolve(getAllCloudsStr))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body)));
} else {
return bytes.asString().mapNotNull(s -> {
try {
return objectMapper.readValue(s, PACloud[].class);
} catch (IOException e) {
log.error(e.getMessage(), e);;
return null;
}
});
}
})
.doOnError(Throwable::printStackTrace)
.blockOptional()
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}
/**
* Get node candidates. See
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/7-node-endpoints.md#71--findnodecandidates-endpoint
*/
public List<NodeCandidate> findNodeCandidates(List<Requirement> requirements) {
return httpClient.post()
.uri(sal_uri.resolve(findNodeCandidatesStr))
.send(bodyMonoPublisher(requirements))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body)));
} else {
return bytes.asString().mapNotNull(s -> {
try {
log.trace("Received message: {}", s);
return objectMapper.readValue(s, NodeCandidate[].class);
} catch (IOException e) {
log.error(e.getMessage(), e);
return null;
}
});
}
})
.doOnError(Throwable::printStackTrace)
.blockOptional()
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}
/**
* Create job. See
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#51--createjob-endpoint
*/
public Boolean createJob(JobDefinition job) {
return httpClient.post()
.uri(sal_uri.resolve(createJobStr))
.send(bodyMonoPublisher(job))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body)));
} else {
return bytes.asString().map(Boolean::parseBoolean);
}
})
.doOnError(Throwable::printStackTrace)
.block();
}
/**
* Get list of jobs. See
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#52--getjobs-endpoint
*/
public List<Job> fetchJobs() {
return httpClient.get()
.uri(sal_uri.resolve(getJobsStr))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body)));
} else {
return bytes.asString().mapNotNull(s -> {
try {
return objectMapper.readValue(s, Job[].class);
} catch (IOException e) {
log.error(e.getMessage(), e);
return null;
}
});
}
})
.doOnError(Throwable::printStackTrace)
.blockOptional()
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}
/**
* Stop SAL jobs. See
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#54--stopjobs-endpoint
*/
public Long stopJobs(List<String> jobIds) {
return httpClient.put()
.uri(sal_uri.resolve(stopJobsStr))
.send(bodyMonoPublisher(jobIds))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body)));
} else {
return bytes.asString().map(Long::parseLong);
}
})
.doOnError(Throwable::printStackTrace)
.block();
}
/**
* documentation
*/
public Boolean addNodes(List<IaasDefinition> nodes, String jobId) {
return httpClient.post()
.uri(sal_uri.resolve(addNodesFormatStr.formatted(jobId)))
.send(bodyMonoPublisher(nodes))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body)));
} else {
// NOTE: was Boolean::new in Morphemic
return bytes.asString().map(Boolean::parseBoolean);
}
})
.doOnError(Throwable::printStackTrace)
.block();
}
/**
* Submit job. See
* https://github.com/ow2-proactive/scheduling-abstraction-layer/blob/master/documentation/5-job-endpoints.md#55--submitjob-endpoint
*/
public String submitJob(String jobId) {
return httpClient.post()
.uri(sal_uri.resolve(submitJobFormatStr.formatted(jobId)))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new RuntimeException(body)));
} else {
// Note: Morphemic parsed this as a long, but we don't,
// since the end point specifies that it returns the
// submitted job id or -1
return bytes.asString();
}
})
.doOnError(Throwable::printStackTrace)
.block();
}
private Mono<ByteBuf> bodyMonoPublisher(Object body) {
// if ((body instanceof JSONArray) || (body instanceof JSONObject)) {
// return ByteBufMono.fromString(Mono.just(body.toString()));
// }
String json = null;
try {
json = objectMapper.writeValueAsString(body);
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);;
}
log.trace("Sending body json: {}", json);
return ByteBufMono.fromString(Mono.just(json));
msg.put("body", mapper.writeValueAsString(requirements));
} catch (JsonProcessingException e) {
log.error("Could not convert requirements list to JSON string", e);
return null;
}
Map<String, Object> response = ExnConnector.findNodeCandidates.sendSync(msg, appID, null, false);
String body = response.get("body").toString(); // body is a string already
try {
return Arrays.asList(mapper.readValue(body, NodeCandidate[].class));
} catch (JsonProcessingException e) {
log.error("Error receiving findNodeCandidates result", e);
return null;
}
}
}

View File

@ -85,7 +85,7 @@ public class NebulousAppTests {
String kubevela_str = Files.readString(getResourcePath("vela-deployment-v2.yml"),
StandardCharsets.UTF_8);
JsonNode kubevela = yaml_mapper.readTree(kubevela_str);
Map<String, List<Requirement>> requirements = NebulousAppDeployer.getSalRequirementsFromKubevela(kubevela);
Map<String, List<Requirement>> requirements = NebulousAppDeployer.getWorkerRequirementsFromKubevela(kubevela);
// We could compare the requirements with what is contained in
// KubeVela, or compare keys with component names, but this would
// essentially duplicate the method code--so we just make sure the
@ -105,7 +105,7 @@ public class NebulousAppTests {
ObjectNode replacements = solutions.withObject("VariableValues");
ObjectNode kubevela1 = app.rewriteKubevelaWithSolution(replacements);
Map<String, List<Requirement>> requirements = NebulousAppDeployer.getSalRequirementsFromKubevela(kubevela1);
Map<String, List<Requirement>> requirements = NebulousAppDeployer.getWorkerRequirementsFromKubevela(kubevela1);
// We could compare the requirements with what is contained in
// KubeVela, or compare keys with component names, but this would
// essentially duplicate the method code--so we just make sure the