small fixes

Change-Id: I02d4ff737ffdf0de194530d087bd0cb9c2815126
This commit is contained in:
Marta 2024-04-15 13:22:02 +02:00
parent 59db6770f8
commit 122a1d0f9c
11 changed files with 60 additions and 317 deletions

View File

@ -4,7 +4,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/* This is the equvalent of the main class which starts the Utility Evaluator */
/* This is the main class which starts the Utility Evaluator */
@SpringBootApplication
public class UtilityEvaluatorApplication {

View File

@ -3,9 +3,6 @@ package eu.nebulous.utilityevaluator;
import org.ow2.proactive.sal.model.NodeCandidate;
import java.util.List;
import org.springframework.stereotype.Component;
import eu.nebulous.utilityevaluator.communication.exnconnector.ExnConnector;
import eu.nebulous.utilityevaluator.communication.exnconnector.PerformanceIndicatorSendingService;
import eu.nebulous.utilityevaluator.communication.sal.NodeCandidatesFetchingService;
import eu.nebulous.utilityevaluator.converter.NodeCandidateConverter;
@ -15,10 +12,9 @@ import eu.nebulous.utilityevaluator.model.VariableDTO;
import eu.nebulous.utilityevaluator.regression.SimpleCostRegression;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.exn.core.SyncedPublisher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/* The main controlling component. It coordinates the work TODO*/
/* The main controlling component. It coordinates the process of creating the initial performance indicators*/
@Slf4j
//@Component
public class UtilityEvaluatorController {
@ -39,14 +35,14 @@ public class UtilityEvaluatorController {
* for types variables that are there, create a list of arguments to the regression
* create regression object
* save it back in the application
* send the parameters via ActiveMQ (maybe in the handler?)
* send the parameters via ActiveMQ
*/
for (String component : application.getVariables().keySet()){
List<NodeCandidate> nodeCandidates = nodeCandidatesService.getNodeCandidatesViaMiddleware(application, component);
log.info("Number of Node Candidates: {}", nodeCandidates.size());
if (nodeCandidates.isEmpty()){
log.error("SAL returned empty list, it is not possible to create cost performance indicator");
log.error("SAL returned empty list, it is not possible to create cost performance indicator for component {}", component);
continue;
}
List<NodeCandidateDTO> convertedNodeCandidates = NodeCandidateConverter.convertToDtoList(nodeCandidates);

View File

@ -1,84 +0,0 @@
package eu.nebulous.utilityevaluator.communication.activemq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
public class Config {
@Value("${spring.activemq.broker-url}")
String BROKER_URL;
@Value("${spring.activemq.user}")
String BROKER_USERNAME;
@Value("${spring.activemq.password}")
String BROKER_PASSWORD;
@Bean
public ActiveMQConnectionFactory connectionFactory(){
System.out.println("Connection factory being createdb for url, username and password:" + BROKER_URL + " " + BROKER_USERNAME + ", " + BROKER_PASSWORD);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setTrustAllPackages(true);
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
connectionFactory.setClientID("optimizer-utilityevaluator");
return connectionFactory;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setObjectMapper(objectMapper());
return converter;
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setMessageConverter(messageConverter());
template.setPubSubDomain(true);
template.setDestinationResolver(destinationResolver());
template.setDeliveryPersistent(true);
return template;
}
@Bean
DynamicDestinationResolver destinationResolver() {
return new DynamicDestinationResolver() {
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
if(destinationName.endsWith("Topic")) {
pubSubDomain = true;
}
else {
pubSubDomain = false;
}
return super.resolveDestinationName(session,destinationName,pubSubDomain);
}
};
}
}

View File

@ -0,0 +1,32 @@
package eu.nebulous.utilityevaluator.communication.exnconnector;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@PropertySource("./application.properties")
@Component
@Slf4j
@Getter
public class ConnectionProperties {
@Value("${spring.exn.broker-url}")
String brokerUrl;
@Value("${spring.exn.broker-port}")
Integer brokerPort;
@Value("${spring.exn.user}")
String brokerUsername;
@Value("${spring.exn.password}")
String brokerPassword;
public ConnectionProperties(@Value("${spring.exn.broker-url}") String url, @Value("${spring.exn.broker-port}") Integer port,
@Value("${spring.exn.user}") String user, @Value("${spring.exn.password}") String password) {
this.brokerUrl = url;
this.brokerPort = port;
this.brokerUsername= user;
this.brokerPassword = password;
log.info("Got connection properties: BROKER_URL: {}, BROKER_PORT: {} BROKER_USERNAME: {}", url, port, user );
}
}

View File

@ -47,7 +47,7 @@ public class DslGenericMessageHandler extends Handler {
app = controller.createInitialCostPerformanceIndicators(app);
//todo: send back this cost performance indicators;
}

View File

@ -2,7 +2,7 @@ package eu.nebulous.utilityevaluator.communication.exnconnector;
import java.util.List;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import eu.nebulouscloud.exn.Connector;
@ -21,32 +21,24 @@ import org.slf4j.LoggerFactory;
@Component
public class ExnConnector {
@Value("${spring.activemq.broker-url}")
String BROKER_URL;
@Value("${spring.activemq.broker-port}")
Integer BROKER_PORT;
@Value("${spring.activemq.user}")
String BROKER_USERNAME;
@Value("${spring.activemq.password}")
String BROKER_PASSWORD;
private static final String GENERAL_APP_CREATION_MESSAGE_TOPIC = "eu.nebulouscloud.ui.dsl.generic.>";
private final DslGenericMessageHandler genericDSLHandler;
private static final String PERFOMANCE_INDICATORS_TOPIC = "eu.nebulouscloud.optimiser.utilityevaluator.performanceindicators";
private static final String GET_NODE_CANDIDATES_TOPIC= "eu.nebulouscloud.exn.sal.nodecandidate.get";
private ConnectionProperties properties;
private final DslGenericMessageHandler genericDSLHandler;
@Getter
private final Publisher performanceIndicatorPublisher;
private static final String GET_NODE_CANDIDATES_TOPIC= "eu.nebulouscloud.exn.sal.nodecandidate.get";
@Getter
private final SyncedPublisher nodeCandidatesGetter;
public ExnConnector() {
public ExnConnector(ConnectionProperties properties) {
super();
this.performanceIndicatorPublisher = new Publisher("costPerformanceIndicators", PERFOMANCE_INDICATORS_TOPIC, true, true);
this.nodeCandidatesGetter = new SyncedPublisher("getNodeCandidates", GET_NODE_CANDIDATES_TOPIC, true, true);
this.genericDSLHandler = new DslGenericMessageHandler(nodeCandidatesGetter, performanceIndicatorPublisher);
this.properties = properties;
init();
}
@ -60,10 +52,10 @@ public class ExnConnector {
false,
false,
new StaticExnConfig(
"localhost",
5672,
BROKER_USERNAME,
BROKER_PASSWORD
properties.getBrokerUrl(),
properties.getBrokerPort(),
properties.getBrokerUsername(),
properties.getBrokerPassword()
)
);
c.start();

View File

@ -1,20 +1,17 @@
package eu.nebulous.utilityevaluator.communication.sal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.ow2.proactive.sal.model.NodeCandidate;
import org.ow2.proactive.sal.model.Requirement;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.nebulous.utilityevaluator.communication.exnconnector.ExnConnector;
import eu.nebulous.utilityevaluator.communication.sal.error.ProactiveClientException;
import eu.nebulous.utilityevaluator.external.KubevelaAnalyzer;
import eu.nebulous.utilityevaluator.model.Application;
@ -36,18 +33,12 @@ public class NodeCandidatesFetchingService {
private SyncedPublisher nodeCandidatesConnector;
private static final ObjectMapper mapper = new ObjectMapper();
//private final ProactiveConnector proactiveClientConnectorService;
/*public NodeCandidatesFetchingService(ProactiveClientProperties properties){
ProactiveConnector connector = new ProactiveConnector(properties);
this.proactiveClientConnectorService = connector;
}*/
//https://gitlab.ow2.org/melodic/melodic-upperware/-/tree/morphemic-rc4.0/cp_generator/src/main/java/eu/paasage/upperware/profiler/generator/communication/impl
public List<NodeCandidate> getNodeCandidatesViaMiddleware(Application app, String componentId){
/*generate requirements (based on kubevela), and providers,
* call SAL via EXN Middleware
* get node candidates
* */
//generate requirements (based on kubevela), and providers, call SAL via EXN Middleware get node candidates
Map<String, List<Requirement>> requirements = KubevelaAnalyzer.getRequirements(app.getKubevela());
/*Map<String, Object> message = new HashMap();
try {
@ -86,20 +77,10 @@ public class NodeCandidatesFetchingService {
}
return salResponse;
}
//old method used for SAL
private List<NodeCandidate> getNodeCandidates(Map<String,String> cloudProviders){
List<Requirement> providerRequirements = convertProviderRequirements(cloudProviders);
return findNodeCandidates(providerRequirements);
}
//to be deleted
private List<Requirement> convertProviderRequirements(Map<String,String> cloudProviders){
//todo: filter based on the chosen cloud providers
return List.of();
}
//old method used to connect to SAL directly
private List<NodeCandidate> findNodeCandidates(List<Requirement> requirements) {
/*private List<NodeCandidate> findNodeCandidates(List<Requirement> requirements) {
List<NodeCandidate> nodeCandidates = new LinkedList<>();
boolean isAnyAsyncNodeCandidatesProcessesInProgress = true;
int requestNo = 0;
@ -120,7 +101,7 @@ public class NodeCandidatesFetchingService {
log.error("Error message body: {}", e2.getMessage());
}
return nodeCandidates;
}
}*/

View File

@ -1,27 +0,0 @@
package eu.nebulous.utilityevaluator.communication.sal;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Getter
public class ProactiveClientProperties {
@Value("${spring.pa-config.rest-url}")
public String url;
@Value("${spring.pa-config.login}")
public String login;
@Value("${spring.pa-config.password}")
public String password;
public ProactiveClientProperties(@Value("${spring.pa-config.rest-url}") String url,
@Value("${spring.pa-config.login}") String login,
@Value("${spring.pa-config.password}") String password) {
this.url = url;
this.login = login;
this.password = password;
}
}

View File

@ -1,142 +0,0 @@
package eu.nebulous.utilityevaluator.communication.sal;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.nebulous.utilityevaluator.communication.sal.error.ProactiveClientException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.logging.LogLevel;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONArray;
import org.json.JSONObject;
import org.ow2.proactive.sal.model.*;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufMono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
@Slf4j
//@Service
public class ProactiveConnector {
private static final String SESSION_HEADER = "sessionid";
private static final int RETRIES_NUMBER = 20;
private static final String PA_GATEWAY = "/pagateway";
private static final String PA_GATEWAY_CONNECT = PA_GATEWAY + "/connect";
private static final String NODECANDIDATES = "/nodecandidates";
public static final String CLOUD = "/cloud";
public static final String CLOUD_NODE_CANDIDATES_FETCH_CHECK = CLOUD + "/async";
private final HttpClient httpClient;
private String sessionId;
private final ObjectMapper objectMapper;
public ProactiveConnector(ProactiveClientProperties properties) {
log.info("Properties: login: {}, pass: {}, url: {}", properties.getLogin(), properties.getPassword(), properties.getUrl());
this.connect(properties.getLogin(), properties.getPassword(), properties.getUrl());
//sessionId = "blablabla";
this.httpClient = HttpClient.create()
.baseUrl(properties.getUrl())
.headers(headers -> headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE))
.headers(headers -> headers.add(SESSION_HEADER, sessionId))
.responseTimeout(Duration.of(80, ChronoUnit.SECONDS))
.wiretap("reactor.netty.http.client.HttpClient", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL, StandardCharsets.UTF_8);
this.httpClient.warmup().block();
this.objectMapper = new ObjectMapper();
this.objectMapper
.configOverride(List.class)
.setSetterInfo(JsonSetter.Value.forValueNulls(Nulls.AS_EMPTY))
.setSetterInfo(JsonSetter.Value.forContentNulls(Nulls.AS_EMPTY));
}
public void connect(String login, String password, String schedulerUrl) {
log.info("Connecting to SAL as a service");
this.sessionId = HttpClient.create()
.headers(headers -> headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED))
.post()
.uri(schedulerUrl + PA_GATEWAY_CONNECT)
.sendForm((req, form) -> form
//.attr("username", login)
.attr("name", login)
.attr("password", password))
.responseContent()
.aggregate()
.asString()
.retry(RETRIES_NUMBER)
.block();
log.info("Connected with sessionId: {}...", sessionId.substring(0,10));
}
//nodeCandidates
public List<NodeCandidate> fetchNodeCandidates(List<Requirement> requirements) {
return httpClient.post()
.uri(NODECANDIDATES)
.send(bodyMonoPublisher(requirements))
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new ProactiveClientException(body)));
} else {
return bytes.asString().mapNotNull(s -> {
try {
log.info("Received message: {}", s);
return objectMapper.readValue(s, NodeCandidate[].class);
} catch (IOException e) {
log.error(e.getMessage(), e);;
return null;
}
});
}
})
.doOnError(Throwable::printStackTrace)
.blockOptional()
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}
private Mono<ByteBuf> bodyMonoPublisher(Object body) {
if ((body instanceof JSONArray) || (body instanceof JSONObject)) {
return ByteBufMono.fromString(Mono.just(body.toString()));
}
String json = null;
try {
json = objectMapper.writeValueAsString(body);
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);;
}
log.info("Sending body json: {}", json);
return ByteBufMono.fromString(Mono.just(json));
}
public Boolean isAnyAsyncNodeCandidatesProcessesInProgress() {
return httpClient.get()
.uri(CLOUD_NODE_CANDIDATES_FETCH_CHECK)
.responseSingle((resp, bytes) -> {
if (!resp.status().equals(HttpResponseStatus.OK)) {
return bytes.asString().flatMap(body -> Mono.error(new ProactiveClientException(body)));
} else {
return bytes.asString().map(Boolean::new);
}
})
.doOnError(Throwable::printStackTrace)
.block();
}
}

View File

@ -65,7 +65,6 @@ public class NodeCandidateConverter {
nodeCandidate.getPrice());
}
//TODO: this method should also encode other fields of NodeCandidates: GPU, providertype, location but we need to have them as variables.
//only for variables that are used
public static double[][] convertListToDoubleArray(List<NodeCandidateDTO> nodeList, List<VariableDTO> variables) {
int size = nodeList.size();
@ -83,7 +82,7 @@ public class NodeCandidateConverter {
usedNodeParameters.add(Long.valueOf(node.getRam()).intValue());
break;
default:
log.info("Variable type {} is not usable in cost performance indicators", variable.getType());
log.debug("Variable type {} is not usable in cost performance indicators", variable.getType());
break;

View File

@ -1,8 +1,4 @@
spring.pa-config.rest-url=http://localhost:8088/sal
spring.pa-config.url=http://localhost:8080/
spring.pa-config.login=admin
spring.pa-config.password=admin
spring.activemq.broker-url=localhost
spring.activemq.broker-port=5672
spring.activemq.user=admin
spring.activemq.password=admin
spring.exn.broker-url=localhost
spring.exn.broker-port=5672
spring.exn.user=admin
spring.exn.password=admin