diff --git a/resource-discovery/pom.xml b/resource-discovery/pom.xml index ff87f62..b0258b9 100644 --- a/resource-discovery/pom.xml +++ b/resource-discovery/pom.xml @@ -6,13 +6,13 @@ org.springframework.boot spring-boot-starter-parent - 3.2.1 + 3.2.4 eu.nebulous.resource-management resource-discovery - 1.0.0-SNAPSHOT + 1.0.2-SNAPSHOT Resource discovery service Nebulous resource discovery service @@ -37,6 +37,12 @@ org.springframework.boot spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-logging + + @@ -120,17 +126,6 @@ - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-logging - - - - diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java index 69610be..735eb4a 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java @@ -6,7 +6,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -59,6 +58,11 @@ public class ResourceDiscoveryProperties { private String deviceLifeCycleRequestsTopic = "ems.client.installation.requests"; private String deviceLifeCycleResponsesTopic = "ems.client.installation.reports"; + // SAL registration settings + private boolean salRegistrationEnabled = true; + private long salRegistrationTimeout = 60*1000; + private String registration_topic_name = "eu.nebulouscloud.exn.sal.node.create"; + // Failed devices detection private boolean automaticFailedDetection = true; private long suspectDeviceThreshold = 5; // in minutes @@ -89,8 +93,8 @@ public class ResourceDiscoveryProperties { private String nebulous_broker_ip_address; private int nebulous_broker_port; private String nebulous_broker_username; - private String lost_device_topic; private String nebulous_broker_password; + private String lost_device_topic; @Data public static class UserData { diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java index dc41182..505c12b 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java @@ -50,6 +50,8 @@ public class Device { private Instant suspectTimestamp; private int retries; + private boolean registeredToSAL; + public void incrementRetries() { retries++; } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceMetricsMonitorService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceMetricsMonitorService.java index 0b355df..d042696 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceMetricsMonitorService.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceMetricsMonitorService.java @@ -6,7 +6,6 @@ import eu.nebulous.resource.discovery.ResourceDiscoveryProperties; import eu.nebulous.resource.discovery.common.BrokerUtil; import eu.nebulous.resource.discovery.monitor.model.Device; import eu.nebulous.resource.discovery.monitor.model.DeviceMetrics; -import eu.nebulous.resource.discovery.monitor.model.DeviceStatus; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -32,6 +31,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService { { super("DeviceMetricsMonitorService", monitorProperties, taskScheduler, objectMapper, brokerUtil); this.deviceManagementService = deviceManagementService; + log.trace("DeviceMetricsMonitorService.: {}", monitorProperties); } @Override @@ -41,6 +41,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService { protected void processPayload(@NonNull Map dataMap) { Object obj = dataMap.get("message"); + log.trace("DeviceMetricsMonitorService: dataMap={}, message={}, message-class={}", dataMap, obj, obj!=null ? obj.getClass() : null); if (obj==null) { log.debug("DeviceMetricsMonitorService: Message does not contain device metrics (message field is null): {}", dataMap); return; @@ -63,6 +64,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService { String clientId = stringValue(metricsMap.get("clientId")); String ipAddress = stringValue(metricsMap.get("ipAddress")); String timestampStr = stringValue(metricsMap.get("receivedAtServer")); + log.debug("DeviceMetricsMonitorService: client={}, ip={}, ts={}", clientId, ipAddress, timestampStr); if (clientId.isEmpty() || ipAddress.isEmpty() || timestampStr.isEmpty()) { log.warn("DeviceMetricsMonitorService: Device metrics received do not contain clientId or ipAddress or receivedAtServer. Ignoring them: {}", metricsMap); return; @@ -72,9 +74,18 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService { // Get registered device using IP address Optional result = deviceManagementService.getByIpAddress(ipAddress); + log.debug("DeviceMetricsMonitorService: device-by-ip: {}", result); if (result.isEmpty()) { log.debug("DeviceMetricsMonitorService: Device metrics IP address does not match any registered device: {}", infoMap); - return; + + result = deviceManagementService.getAll().stream() + .filter(d->d.getStatusUpdate()!=null) + .filter(d->StringUtils.isNotBlank(d.getStatusUpdate().getClientId())) + .filter(d->StringUtils.equalsIgnoreCase(d.getStatusUpdate().getClientId(), clientId)) + .findAny(); + log.debug("DeviceMetricsMonitorService: device-by-clientId: {}", result); + if (result.isEmpty()) + return; } Device device = result.get(); @@ -115,6 +126,7 @@ public class DeviceMetricsMonitorService extends AbstractMonitorService { deviceManagementService.update(device); log.debug("DeviceMetricsMonitorService: Device metrics updated for device: id={}, ip-address={}, update={}", device.getId(), device.getIpAddress(), metrics); + log.debug("DeviceMetricsMonitorService: Device statistics updated: {}", device); } catch (Exception e) { log.warn("DeviceMetricsMonitorService: EXCEPTION while processing device metrics map: {}\n", infoMap, e); } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java index ce40a47..90052d9 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java @@ -370,6 +370,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess device.setRequestId(registrationRequest.getId()); device.setNodeReference(registrationRequest.getNodeReference()); deviceManagementService.save(device); - salRegistrationService.register(device); + if (processorProperties.isSalRegistrationEnabled()) + salRegistrationService.queueForRegistration(device); } } diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java index 1e0d08d..443a8ac 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java @@ -3,28 +3,36 @@ package eu.nebulous.resource.discovery.registration.service; import eu.nebulous.resource.discovery.ResourceDiscoveryProperties; import eu.nebulous.resource.discovery.broker_communication.SynchronousBrokerPublisher; import eu.nebulous.resource.discovery.monitor.model.Device; +import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.json.simple.JSONObject; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Service; import java.time.Clock; -import java.util.ArrayList; import java.util.Collections; import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; -import static eu.nebulous.resource.discovery.broker_communication.SALCommunicator.*; +import static eu.nebulous.resource.discovery.broker_communication.SALCommunicator.get_device_registration_json; @Slf4j @Service +@RequiredArgsConstructor public class SALRegistrationService implements InitializingBean { - @Autowired + private final DeviceManagementService deviceManagementService; private final ResourceDiscoveryProperties processorProperties; + private final TaskExecutor taskExecutor; + private final LinkedBlockingDeque queue = new LinkedBlockingDeque<>(); + private Thread processQueueThread; + private long lastRegistrationStartTimestamp = -1L; - public SALRegistrationService(ResourceDiscoveryProperties processorProperties) { - this.processorProperties = processorProperties; + public void queueForRegistration(@NonNull Device device) { + if (processorProperties.isSalRegistrationEnabled()) + queue.add(device); } public void register(Device device) { @@ -50,9 +58,9 @@ public class SALRegistrationService implements InitializingBean { */ String device_name = device.getName(); - Integer cores = Integer.parseInt(device_info.get("CPU_PROCESSORS")); - Integer ram_gb = Integer.parseInt(device_info.get("RAM_TOTAL_KB"))/1000000; - Integer disk_gb = Integer.parseInt(device_info.get("DISK_TOTAL_KB"))/1000000; + int cores = Integer.parseInt(device_info.get("CPU_PROCESSORS")); + int ram_gb = Integer.parseInt(device_info.get("RAM_TOTAL_KB"))/1000000; + int disk_gb = Integer.parseInt(device_info.get("DISK_TOTAL_KB"))/1000000; String external_ip_address = device.getIpAddress(); String device_username = device.getUsername(); String device_password = new String(device.getPassword()); @@ -68,11 +76,11 @@ public class SALRegistrationService implements InitializingBean { //register_device_message.put("timestamp",(int)(clock.millis()/1000)); String register_device_message_string = get_device_registration_json("10.100.100",external_ip_address,cores,ram_gb,disk_gb,device_name,"test_provider","Athens","Greece", device_username, device_password); - log.error("topic is "+get_registration_topic_name(application_name)); - log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address()); - log.error("broker port is "+processorProperties.getNebulous_broker_port()); - log.error("username is "+processorProperties.getNebulous_broker_username()); - log.error("password is "+processorProperties.getNebulous_broker_password()); + log.info("topic is {}", get_registration_topic_name(application_name)); + log.info("broker ip is {}", processorProperties.getNebulous_broker_ip_address()); + log.info("broker port is {}", processorProperties.getNebulous_broker_port()); + log.info("username is {}", processorProperties.getNebulous_broker_username()); + log.info("password is {}", StringUtils.isNotBlank(processorProperties.getNebulous_broker_password()) ? "" : ""); //String sal_running_applications_reply = request_running_applications_AMQP(); //ArrayList applications = get_running_applications(sal_running_applications_reply); //for (String application_name:applications) { @@ -96,22 +104,73 @@ public class SALRegistrationService implements InitializingBean { } private String get_registration_topic_name(String application_name) { - return "eu.nebulouscloud.exn.sal.node.create"; + return processorProperties.getRegistration_topic_name(); //return ("eu.nebulouscloud.exn.sal.edge." + application_name); } @Override - public void afterPropertiesSet() throws Exception { - if ( processorProperties.getNebulous_broker_password()!=null && - processorProperties.getNebulous_broker_username()!=null && - processorProperties.getNebulous_broker_ip_address()!=null - ){ + public void afterPropertiesSet() { + if (! processorProperties.isSalRegistrationEnabled()) { + log.info("SAL registration is disabled due to configuration"); + return; + } + + if ( StringUtils.isNotBlank(processorProperties.getNebulous_broker_ip_address()) && + StringUtils.isNotBlank(processorProperties.getNebulous_broker_username()) && + StringUtils.isNotBlank(processorProperties.getNebulous_broker_password()) ) + { log.info("Successful setting of properties for communication with SAL"); - }else{ - log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address()); - log.error("username is "+processorProperties.getNebulous_broker_username()); - log.error("password is "+processorProperties.getNebulous_broker_password()); - throw new Exception("Required data is null - broker ip is "+processorProperties.getNebulous_broker_ip_address()+" username is "+processorProperties.getNebulous_broker_username()+" password is "+processorProperties.getNebulous_broker_password()); + taskExecutor.execute(this::processQueue); + taskExecutor.execute(this::checkProcessQueue); + } else { + String message = String.format("Nebulous broker configuration is missing: ip-address=%s, username=%s, password=%s", + processorProperties.getNebulous_broker_ip_address(), + processorProperties.getNebulous_broker_username(), + StringUtils.isNotBlank(processorProperties.getNebulous_broker_password()) ? "" : ""); + log.error(message); + throw new RuntimeException(message); + } + } + + public void processQueue() { + processQueueThread = Thread.currentThread(); + while (true) { + Device device = null; + try { + device = queue.take(); + log.warn("SALRegistrationService: processQueue(): Will register device: {}", device); + lastRegistrationStartTimestamp = System.currentTimeMillis(); + register(device); + lastRegistrationStartTimestamp = -1L; + device.setRegisteredToSAL(true); + deviceManagementService.update(device); + log.warn("SALRegistrationService: processQueue(): Device registered to SAL: {}", device); + } catch (InterruptedException e) { + log.warn("SALRegistrationService: processQueue(): Interrupted. Will not register device to SAL: {}", device); + lastRegistrationStartTimestamp = -1L; +// break; + } catch (Exception e) { + log.warn("SALRegistrationService: processQueue(): EXCEPTION caught. Will not register device to SAL: {}", device, e); + lastRegistrationStartTimestamp = -1L; + } + } + } + + public void checkProcessQueue() { + while (true) { + try { + Thread.sleep(1000); + if (processQueueThread!=null && lastRegistrationStartTimestamp > 0) { + long runningTime = System.currentTimeMillis() - lastRegistrationStartTimestamp; + if (runningTime > processorProperties.getSalRegistrationTimeout()) { + log.warn("SALRegistrationService: checkProcessQueue(): Method 'processQueue' is running for too log. Will attempt to interrupt it"); + processQueueThread.interrupt(); + lastRegistrationStartTimestamp = -1L; + } + } + } catch (Exception e) { + log.warn("SALRegistrationService: checkProcessQueue(): EXCEPTION caught: ", e); + } } } } diff --git a/resource-discovery/src/main/resources/application.yml b/resource-discovery/src/main/resources/application.yml index 74ae20f..2d707bf 100644 --- a/resource-discovery/src/main/resources/application.yml +++ b/resource-discovery/src/main/resources/application.yml @@ -27,9 +27,9 @@ discovery: sal_host: "localhost" sal_port: 8080 lost_device_topic: "eu.nebulouscloud.monitoring.device_lost" - trustStoreFile: tests/config/broker-truststore.p12 - trustStorePassword: melodic - trustStoreType: PKCS12 +# trustStoreFile: tests/config/broker-truststore.p12 +# trustStorePassword: melodic +# trustStoreType: PKCS12 allowedDeviceInfoKeys: - '*' # NOTE: diff --git a/resource-discovery/src/main/resources/banner.txt b/resource-discovery/src/main/resources/banner.txt index 1e12e7a..c8085cb 100644 --- a/resource-discovery/src/main/resources/banner.txt +++ b/resource-discovery/src/main/resources/banner.txt @@ -9,3 +9,4 @@ ${AnsiColor.012} ╚═╝ ╚═╝╚══════╝╚═════ ${AnsiColor.046} :: App version :: ${AnsiColor.87} (${application.version}) ${AnsiColor.046} :: Spring Boot :: ${AnsiColor.87} ${spring-boot.formatted-version} ${AnsiColor.046} :: Java (TM) :: ${AnsiColor.87} (${java.version}) +${AnsiColor.DEFAULT}${AnsiStyle.NORMAL} \ No newline at end of file