Copied new changes from main repository

Change-Id: I0fca4c35bfd7c26edf8dbc6d62b27223f2eb527f
This commit is contained in:
ipatini 2024-04-03 23:56:34 +03:00
parent a39b3ff4df
commit 8593e5a874
41 changed files with 996 additions and 395 deletions

View File

@ -9,9 +9,6 @@
package gr.iccs.imu.ems.baguette.client.install.plugin;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import gr.iccs.imu.ems.baguette.client.install.ClientInstallationTask;
import gr.iccs.imu.ems.baguette.client.install.InstallationContextProcessorPlugin;
import gr.iccs.imu.ems.translate.model.Monitor;
@ -23,7 +20,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Installation context processor plugin for generating 'allowed-topics' setting
@ -41,8 +40,6 @@ public class AllowedTopicsProcessorPlugin implements InstallationContextProcesso
StringBuilder sbAllowedTopics = new StringBuilder();
Set<String> addedTopicsSet = new HashSet<>();
Map<String, List<Object>> collectorConfigs = new LinkedHashMap<>();
boolean first = true;
for (Monitor monitor : task.getTranslationContext().getMON()) {
try {
@ -77,16 +74,7 @@ public class AllowedTopicsProcessorPlugin implements InstallationContextProcesso
}
}
}
if (monitor.getSensor().isPullSensor()) {
if (sensorConfig.get("type") instanceof String type && StringUtils.isNotBlank(type)) {
collectorConfigs
.computeIfAbsent(type, key->new LinkedList<>())
.add(monitor.getSensor());
}
}
}
log.trace("AllowedTopicsProcessorPlugin: Task #{}: MONITOR: metric={}, allowed-topics={}",
taskCounter, metricName, sbAllowedTopics);
@ -97,30 +85,8 @@ public class AllowedTopicsProcessorPlugin implements InstallationContextProcesso
}
String allowedTopics = sbAllowedTopics.toString();
log.debug("AllowedTopicsProcessorPlugin: Task #{}: Allowed-Topics configuration for collectors: \n{}", taskCounter, allowedTopics);
String collectorConfigsStr = null;
try {
if (! collectorConfigs.isEmpty()) {
log.debug("AllowedTopicsProcessorPlugin: Task #{}: Pull-Sensor collector configurations: \n{}", taskCounter, collectorConfigs);
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
/*mapper.setFilterProvider(new SimpleFilterProvider().addFilter("customerFilter",
SimpleBeanPropertyFilter.serializeAllExcept("@objectClass")));*/
collectorConfigsStr = mapper
.writerWithDefaultPrettyPrinter()
.writeValueAsString(collectorConfigs);
}
} catch (JsonProcessingException e) {
log.error("AllowedTopicsProcessorPlugin: Task #{}: EXCEPTION while processing sensor configs. Skipping them.\n",
taskCounter, e);
}
if (StringUtils.isBlank(collectorConfigsStr))
collectorConfigsStr = "{ }";
log.debug("AllowedTopicsProcessorPlugin: Task #{}: Pull-Sensor collector configurations String: \n{}", taskCounter, collectorConfigsStr);
task.getNodeRegistryEntry().getPreregistration().put(EmsConstant.COLLECTOR_ALLOWED_TOPICS_VAR, allowedTopics);
task.getNodeRegistryEntry().getPreregistration().put(EmsConstant.COLLECTOR_CONFIGURATIONS_VAR, collectorConfigsStr);
log.debug("AllowedTopicsProcessorPlugin: Task #{}: Allowed-Topics configuration for collectors: \n{}", taskCounter, allowedTopics);
// Store collector configurations in config service
try {
@ -128,10 +94,10 @@ public class AllowedTopicsProcessorPlugin implements InstallationContextProcesso
.getOrCreateConfigFile(
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE,
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FORMAT)
.put(EmsConstant.COLLECTOR_CONFIGURATIONS_VAR, collectorConfigsStr);
.put(EmsConstant.COLLECTOR_ALLOWED_TOPICS_VAR, allowedTopics);
} catch (Exception e) {
log.error("BaguetteServer.startServer(): Failed to store connection info in ems-client-config-map: {}, Exception: ",
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE, e);
log.error("AllowedTopicsProcessorPlugin: Task #{}: Failed to store Allowed Topics in config. file: {}, Exception: ",
taskCounter, EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE, e);
}
log.debug("AllowedTopicsProcessorPlugin: Task #{}: processBeforeInstallation: END", taskCounter);

View File

@ -0,0 +1,111 @@
/*
* Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package gr.iccs.imu.ems.baguette.client.install.plugin;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import gr.iccs.imu.ems.baguette.client.install.ClientInstallationTask;
import gr.iccs.imu.ems.baguette.client.install.InstallationContextProcessorPlugin;
import gr.iccs.imu.ems.util.ConfigWriteService;
import gr.iccs.imu.ems.util.EmsConstant;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Installation context processor plugin for generating 'collector-configurations' setting
* used in baguette-client[.yml/.properties] configuration file.
* It sets the 'COLLECTOR_CONFIGURATIONS' variable in pre-registration context.
*/
@Slf4j
@Data
@Service
public class CollectorConfigurationsProcessorPlugin implements InstallationContextProcessorPlugin {
@Override
public void processBeforeInstallation(ClientInstallationTask task, long taskCounter) {
log.debug("CollectorConfigurationsProcessorPlugin: Task #{}: processBeforeInstallation: BEGIN", taskCounter);
log.trace("CollectorConfigurationsProcessorPlugin: Task #{}: processBeforeInstallation: BEGIN: task={}", taskCounter, task);
Map<String, List<Object>> collectorConfigs = new LinkedHashMap<>();
String collectorConfigsStr = null;
task.getTranslationContext().getMON().forEach(monitor -> {
log.trace("CollectorConfigurationsProcessorPlugin: Task #{}: Processing monitor: {}", taskCounter, monitor);
// Get sensor configuration
Map<String,Object> sensorConfig = monitor.getSensor().getConfiguration();
// Process Destination aliases, if specified in configuration
if (sensorConfig!=null) {
if (monitor.getSensor().isPullSensor()) {
if (sensorConfig.get("type") instanceof String type && StringUtils.isNotBlank(type)) {
collectorConfigs
.computeIfAbsent(type, key->new LinkedList<>())
.add(monitor.getSensor());
}
}
}
});
try {
log.debug("CollectorConfigurationsProcessorPlugin: Task #{}: Pull-Sensor collector configurations: \n{}", taskCounter, collectorConfigs);
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
collectorConfigsStr = mapper
.writeValueAsString(collectorConfigs);
} catch (JsonProcessingException e) {
log.error("CollectorConfigurationsProcessorPlugin: Task #{}: EXCEPTION while processing sensor configs. Skipping them.\n",
taskCounter, e);
}
if (StringUtils.isBlank(collectorConfigsStr))
collectorConfigsStr = "{ }";
log.debug("CollectorConfigurationsProcessorPlugin: Task #{}: Pull-Sensor collector configurations string: \n{}", taskCounter, collectorConfigsStr);
task.getNodeRegistryEntry().getPreregistration().put(EmsConstant.COLLECTOR_CONFIGURATIONS_VAR, collectorConfigsStr);
log.debug("CollectorConfigurationsProcessorPlugin: Task #{}: Collector configurations: \n{}", taskCounter, collectorConfigsStr);
// Store collector configurations in config service
try {
ConfigWriteService.getInstance()
.getOrCreateConfigFile(
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE,
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FORMAT)
.put(EmsConstant.COLLECTOR_CONFIGURATIONS_VAR, collectorConfigsStr);
} catch (Exception e) {
log.error("CollectorConfigurationsProcessorPlugin: Task #{}: Failed to store Collector Configurations in config. file: {}, Exception: ",
taskCounter, EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE, e);
}
log.debug("CollectorConfigurationsProcessorPlugin: Task #{}: processBeforeInstallation: END", taskCounter);
}
@Override
public void processAfterInstallation(ClientInstallationTask task, long taskCounter, boolean success) {
log.debug("CollectorConfigurationsProcessorPlugin: Task #{}: processAfterInstallation: success={}", taskCounter, success);
log.trace("CollectorConfigurationsProcessorPlugin: Task #{}: processAfterInstallation: success={}, task={}", taskCounter, success, task);
}
@Override
public void start() {
log.debug("CollectorConfigurationsProcessorPlugin: Started");
}
@Override
public void stop() {
log.debug("CollectorConfigurationsProcessorPlugin: Stopped");
}
}

View File

@ -1,188 +0,0 @@
/*
* Copyright (C) 2017-2025 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package gr.iccs.imu.ems.baguette.client.install.watch;
import gr.iccs.imu.ems.baguette.server.NodeRegistry;
import gr.iccs.imu.ems.baguette.server.NodeRegistryEntry;
import gr.iccs.imu.ems.util.PasswordUtil;
import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.Resource;
import lombok.Data;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
/**
* Kubernetes cluster pods watcher service
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class K8sPodWatcher implements InitializingBean {
private static final String K8S_SERVICE_ACCOUNT_SECRETS_PATH_DEFAULT = "/var/run/secrets/kubernetes.io/serviceaccount";
private final TaskScheduler taskScheduler;
private final PasswordUtil passwordUtil;
private final NodeRegistry nodeRegistry;
@Override
public void afterPropertiesSet() throws Exception {
if (Boolean.parseBoolean(getConfig("K8S_WATCHER_ENABLED", "true"))) {
taskScheduler.scheduleWithFixedDelay(this::doWatch, Instant.now().plusSeconds(20), Duration.ofSeconds(10));
} else {
log.warn("K8sPodWatcher: Disabled (set K8S_WATCHER_ENABLED=true to enable)");
}
}
private String getConfig(@NonNull String key, String defaultValue) {
String value = System.getenv(key);
return value==null ? defaultValue : value;
}
private void doWatch() {
Map<String, NodeEntry> addressToNodeMap;
Map<String, Set<PodEntry>> addressToPodMap;
try {
log.debug("K8sPodWatcher: BEGIN: doWatch");
String serviceAccountPath = getConfig("K8S_SERVICE_ACCOUNT_SECRETS_PATH", K8S_SERVICE_ACCOUNT_SECRETS_PATH_DEFAULT);
String masterUrl = getConfig("KUBERNETES_SERVICE_HOST", null);
String caCert = Files.readString(Paths.get(serviceAccountPath, "ca.crt"));
String token = Files.readString(Paths.get(serviceAccountPath, "token"));
String namespace = Files.readString(Paths.get(serviceAccountPath, "namespace"));
log.trace("""
K8sPodWatcher:
Master URL: {}
CA cert.:
{}
Token: {}
Namespace: {}""",
masterUrl, caCert.trim(), passwordUtil.encodePassword(token), namespace);
// Configure and start Kubernetes API client
Config config = new ConfigBuilder()
.withMasterUrl(masterUrl)
.withCaCertData(caCert)
.withOauthToken(token)
.build();
try (KubernetesClient client = new KubernetesClientBuilder().withConfig(config).build()) {
log.debug("K8sPodWatcher: Retrieving active Kubernetes cluster nodes and pods");
// Get Kubernetes cluster nodes (Hosts)
addressToNodeMap = new HashMap<>();
Map<String, NodeEntry> uidToNodeMap = new HashMap<>();
client.nodes()
.resources()
.map(Resource::item)
.forEach(node -> {
NodeEntry entry = uidToNodeMap.computeIfAbsent(
node.getMetadata().getUid(), s -> new NodeEntry(node));
node.getStatus().getAddresses().stream()
.filter(address -> ! "Hostname".equalsIgnoreCase(address.getType()))
.forEach(address -> addressToNodeMap.putIfAbsent(address.getAddress(), entry));
});
log.trace("K8sPodWatcher: Address-to-Nodes: {}", addressToNodeMap);
// Get Kubernetes cluster pods
addressToPodMap = new HashMap<>();
Map<String, PodEntry> uidToPodMap = new HashMap<>();
client.pods()
.inAnyNamespace()
// .withLabel("nebulous.application")
.resources()
.map(Resource::item)
.filter(pod-> "Running".equalsIgnoreCase(pod.getStatus().getPhase()))
.forEach(pod -> {
PodEntry entry = uidToPodMap.computeIfAbsent(
pod.getMetadata().getUid(), s -> new PodEntry(pod));
pod.getStatus().getPodIPs()
.forEach(address ->
addressToPodMap.computeIfAbsent(address.getIp(), s -> new HashSet<>()).add(entry)
);
});
log.trace("K8sPodWatcher: Address-to-Pods: {}", addressToPodMap);
} // End of try-with-resources
// Update Node Registry
log.debug("K8sPodWatcher: Updating Node Registry");
Map<String, NodeRegistryEntry> addressToNodeEntryMap = nodeRegistry.getNodes().stream()
.collect(Collectors.toMap(NodeRegistryEntry::getIpAddress, entry -> entry));
// New Pods
HashMap<String, Set<PodEntry>> newPods = new HashMap<>(addressToPodMap);
newPods.keySet().removeAll(addressToNodeEntryMap.keySet());
if (! newPods.isEmpty()) {
log.trace("K8sPodWatcher: New Pods found: {}", newPods);
/*newPods.forEach((address, podSet) -> {
if (podSet.size()==1)
nodeRegistry.addNode(null, podSet.iterator().next().getPodUid());
});*/
} else {
log.trace("K8sPodWatcher: No new Pods");
}
// Node Entries to be removed
HashMap<String, NodeRegistryEntry> oldEntries = new HashMap<>(addressToNodeEntryMap);
oldEntries.keySet().removeAll(addressToPodMap.keySet());
if (! oldEntries.isEmpty()) {
log.trace("K8sPodWatcher: Node entries to be removed: {}", oldEntries);
} else {
log.trace("K8sPodWatcher: No node entries to remove");
}
} catch (Exception e) {
log.warn("K8sPodWatcher: Error while running doWatch: ", e);
}
}
@Data
private static class NodeEntry {
private final String nodeUid;
private final String nodeName;
public NodeEntry(Node node) {
nodeUid = node.getMetadata().getUid();
nodeName = node.getMetadata().getName();
}
}
@Data
private static class PodEntry {
private final String podUid;
private final String podIP;
private final String podName;
private final String hostIP;
private final Map<String, String> labels;
public PodEntry(Pod pod) {
podUid = pod.getMetadata().getUid();
podIP = pod.getStatus().getPodIP();
podName = pod.getMetadata().getName();
hostIP = pod.getStatus().getHostIP();
labels = Collections.unmodifiableMap(pod.getMetadata().getLabels());
}
}
}

View File

@ -14,9 +14,9 @@ ARG RUN_IMAGE_TAG=21.0.1_12-jre
FROM $RUN_IMAGE:$RUN_IMAGE_TAG
# Install required and optional packages
RUN apt-get update \
&& apt-get install -y vim iputils-ping \
&& rm -rf /var/lib/apt/lists/*
#RUN apt-get update \
# && apt-get install -y vim iputils-ping \
# && rm -rf /var/lib/apt/lists/*
# Add an EMS user
ARG EMS_USER=emsuser

View File

@ -83,8 +83,6 @@ server-password: ${BAGUETTE_SERVER_PASSWORD}
#collector-classes: gr.iccs.imu.ems.baguette.client.collector.netdata.NetdataCollector
collector-configurations: ${COLLECTOR_CONFIGURATIONS}
collector:
netdata:
enable: true
@ -114,6 +112,8 @@ collector:
#addTagsInEventPayload: true
#throwExceptionWhenExcessiveCharsOccur: true
#collector-configurations: ${COLLECTOR_CONFIGURATIONS}
# -----------------------------------------------------------------------------
# Cluster settings
# -----------------------------------------------------------------------------

View File

@ -11,7 +11,7 @@ package gr.iccs.imu.ems.baguette.client;
import gr.iccs.imu.ems.baguette.client.cluster.ClusterManagerProperties;
import gr.iccs.imu.ems.baguette.client.collector.netdata.NetdataCollector;
//import prometheus.collector.gr.iccs.imu.ems.baguette.client.PrometheusCollector;
import gr.iccs.imu.ems.baguette.client.collector.prometheus.PrometheusCollector2;
import gr.iccs.imu.ems.baguette.client.plugin.recovery.SelfHealingPlugin;
import gr.iccs.imu.ems.util.EventBus;
import lombok.Getter;
@ -48,7 +48,7 @@ public class BaguetteClient implements ApplicationRunner {
private final ConfigurableApplicationContext applicationContext;
private final List<Class<? extends Collector>> DEFAULT_COLLECTORS_LIST = List.of(
NetdataCollector.class//, PrometheusCollector.class
NetdataCollector.class, PrometheusCollector2.class
);
@Getter

View File

@ -165,7 +165,7 @@ public class CommandExecutor {
String[] s = line.split(" ", 2);
log.info("Cluster key from Server: {} {}", s[0], s.length>1 ? passwordUtil.encodePassword(s[1]) : "");
} else
log.info("Server input: {}", line);
log.debug("Server input: {}", line);
try {
log.trace("communicateWithServer(): Calling execCmd: {}", line);
@ -737,6 +737,7 @@ public class CommandExecutor {
protected synchronized void setClientConfiguration(String configStr) {
try {
// Update Baguette client configuration
log.debug("Received serialization of client configuration: {}", configStr);
ClientConfiguration config = (ClientConfiguration) SerializationUtil.deserializeFromString(configStr);
ClientConfiguration oldConfig = clientConfiguration;
@ -744,14 +745,26 @@ public class CommandExecutor {
log.debug("Old client config.: {}", oldConfig);
}
synchronized (groupings) {
if (oldConfig!=null && (config.getCollectorConfigurations()==null || config.getCollectorConfigurations().isEmpty())) {
config.setCollectorConfigurations(oldConfig.getCollectorConfigurations());
log.trace("Copied collector-configs from old client config.: \n{}", oldConfig.getCollectorConfigurations());
}
clientConfiguration = config;
}
log.info("New client config.: {}", config);
log.debug("New client config.: {}", config);
HashMap<String,ClientConfiguration> payload = new HashMap<>();
payload.put("new", clientConfiguration);
payload.put("old", oldConfig);
eventBus.send(EventConstant.EVENT_CLIENT_CONFIG_UPDATED, payload, this);
// Update collectors' configurations
Map<String, List<Map<String, Serializable>>> collectorConfigs = clientConfiguration.getCollectorConfigurations();
applicationContext.getBean(BaguetteClient.class).getCollectorsList().forEach(collector -> {
List<Map<String, Serializable>> cc = collectorConfigs.get(collector.getName());
if (cc!=null)
collector.setConfiguration(cc);
});
} catch (Exception ex) {
log.error("Exception while deserializing received Client configuration: ", ex);
}

View File

@ -50,7 +50,7 @@ public class K8sNetdataCollector implements Collector, InitializingBean {
private final RestClient restClient = RestClient.create();
private final List<ScheduledFuture<?>> scheduledFuturesList = new LinkedList<>();
private boolean started;
private List<Map<String, Object>> configuration;
private List<Map<String, Object>> configurations;
@Override
public void afterPropertiesSet() throws Exception {
@ -66,11 +66,11 @@ public class K8sNetdataCollector implements Collector, InitializingBean {
@Override
public void setConfiguration(Object config) {
if (config instanceof List sensorConfigList) {
configuration = sensorConfigList.stream()
configurations = sensorConfigList.stream()
.filter(o -> o instanceof Map)
.filter(map -> ((Map)map).keySet().stream().allMatch(k->k instanceof String))
.toList();
log.debug("K8sNetdataCollector: setConfiguration: {}", configuration);
log.debug("K8sNetdataCollector: setConfiguration: {}", configurations);
// If configuration changes while collector running we need to restart it
if (started) {
@ -86,7 +86,7 @@ public class K8sNetdataCollector implements Collector, InitializingBean {
@Override
public void start() {
if (started) return;
if (configuration!=null)
if (configurations!=null)
doStart();
started = true;
log.debug("K8sNetdataCollector: Started");
@ -101,7 +101,7 @@ public class K8sNetdataCollector implements Collector, InitializingBean {
}
private synchronized void doStart() {
log.debug("K8sNetdataCollector: doStart(): BEGIN: configuration={}", configuration);
log.debug("K8sNetdataCollector: doStart(): BEGIN: configuration={}", configurations);
log.trace("K8sNetdataCollector: doStart(): BEGIN: scheduledFuturesList={}", scheduledFuturesList);
// Get Netdata agent address and port from env. vars
@ -119,7 +119,7 @@ public class K8sNetdataCollector implements Collector, InitializingBean {
// Process each sensor configuration
AtomicInteger sensorNum = new AtomicInteger(0);
configuration.forEach(map -> {
configurations.forEach(map -> {
log.debug("K8sNetdataCollector: doStart(): Sensor-{}: map={}", sensorNum.incrementAndGet(), map);
// Check if it is a Pull sensor. (Push sensors are ignored)

View File

@ -31,7 +31,7 @@ import java.util.Map;
@Slf4j
@Component
public class NetdataCollector extends gr.iccs.imu.ems.common.collector.netdata.NetdataCollector implements Collector {
private List<Map<String,Object>> configuration;
private List<Map<String,Object>> configurations;
public NetdataCollector(@NonNull NetdataCollectorProperties properties,
@NonNull CollectorContext collectorContext,
@ -51,11 +51,11 @@ public class NetdataCollector extends gr.iccs.imu.ems.common.collector.netdata.N
@Override
public void setConfiguration(Object config) {
if (config instanceof List sensorConfigList) {
configuration = sensorConfigList.stream()
configurations = sensorConfigList.stream()
.filter(o -> o instanceof Map)
.filter(map -> ((Map)map).keySet().stream().allMatch(k->k instanceof String))
.toList();
log.info("Collectors::Netdata: setConfiguration: {}", configuration);
log.info("Collectors::Netdata: setConfiguration: {}", configurations);
}
}

View File

@ -31,7 +31,7 @@ import java.util.Map;
@Slf4j
@Component
public class PrometheusCollector extends gr.iccs.imu.ems.common.collector.prometheus.PrometheusCollector implements Collector {
private List<Map<String,Object>> configuration;
private List<Map<String,Object>> configurations;
public PrometheusCollector(@NonNull PrometheusCollectorProperties properties,
@NonNull CollectorContext collectorContext,
@ -51,11 +51,11 @@ public class PrometheusCollector extends gr.iccs.imu.ems.common.collector.promet
@Override
public void setConfiguration(Object config) {
if (config instanceof List sensorConfigList) {
configuration = sensorConfigList.stream()
configurations = sensorConfigList.stream()
.filter(o -> o instanceof Map)
.filter(map -> ((Map)map).keySet().stream().allMatch(k->k instanceof String))
.toList();
log.info("Collectors::Prometheus: setConfiguration: {}", configuration);
log.info("Collectors::Prometheus: setConfiguration: {}", configurations);
}
}

View File

@ -0,0 +1,287 @@
/*
* Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package gr.iccs.imu.ems.baguette.client.collector.prometheus;
import gr.iccs.imu.ems.baguette.client.Collector;
import gr.iccs.imu.ems.brokercep.event.EventMap;
import gr.iccs.imu.ems.common.collector.AbstractEndpointCollector;
import gr.iccs.imu.ems.common.collector.CollectorContext;
import gr.iccs.imu.ems.common.collector.prometheus.OpenMetricsParser;
import gr.iccs.imu.ems.common.collector.prometheus.PrometheusCollectorProperties;
import gr.iccs.imu.ems.util.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClient;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
/**
* Collects measurements from a Prometheus exporter endpoint
*/
@Slf4j
@Component
public class PrometheusCollector2 extends AbstractEndpointCollector<String> implements Collector {
private final PrometheusCollectorProperties properties;
private List<Map<String, Serializable>> configurations = List.of();
private final List<ScheduledFuture<?>> scrapingTasks = new LinkedList<>();
private RestClient restClient;
private OpenMetricsParser openMetricsParser;
private final LinkedBlockingQueue<EventMap> eventsQueue = new LinkedBlockingQueue<>();
@SuppressWarnings("unchecked")
public PrometheusCollector2(PrometheusCollectorProperties properties, CollectorContext collectorContext, TaskScheduler taskScheduler, EventBus<String,Object,Object> eventBus) {
super("PrometheusCollector2", properties, collectorContext, taskScheduler, eventBus);
this.properties = properties;
this.autoStartRunner = false; // Don't start the default runner
}
@Override
public void afterPropertiesSet() {
log.debug("Collectors::Prometheus2: properties: {}", properties);
super.afterPropertiesSet();
initRestClientAndParser();
startEventPublishTask();
}
protected ResponseEntity<String> getData(String url) {
return null;
}
protected void processData(String data, String nodeAddress, ProcessingStats stats) {
}
@Override
public String getName() {
return "prometheus";
}
@Override
public void setConfiguration(Object config) {
if (config instanceof List sensorConfigList) {
configurations = sensorConfigList.stream()
.filter(o -> o instanceof Map)
.toList();
applyNewConfigurations();
}
}
public synchronized void activeGroupingChanged(String oldGrouping, String newGrouping) {
log.info("Collectors::Prometheus2: activeGroupingChanged: New Allowed Topics for active grouping: {} -- !! Not used !!", newGrouping);
}
private void applyNewConfigurations() {
log.debug("Collectors::Prometheus2: applyNewConfigurations: {}", configurations);
if (configurations==null) return;
// Cancel previous tasks
if (! scrapingTasks.isEmpty()) {
log.trace("Collectors::Prometheus2: applyNewConfigurations: Cancelling previous scraping tasks: {}", scrapingTasks);
List<ScheduledFuture<?>> list = new ArrayList<>(scrapingTasks);
scrapingTasks.clear();
list.forEach(task -> task.cancel(true));
log.trace("Collectors::Prometheus2: applyNewConfigurations: Cancelled previous scraping tasks: {}", scrapingTasks);
}
// Create new scraping tasks
log.trace("Collectors::Prometheus2: applyNewConfigurations: Starting new scraping tasks: configurations: {}", configurations);
Instant startInstant = Instant.now();
configurations.forEach(config -> {
if (checkConfig(config)) {
String destination = config.getOrDefault("name", "").toString();
String prometheusMetric = getPrometheusMetric(config);
String url = getUrlPattern(config);
Duration delay = getDelay(config);
Duration period = getInterval(config);
Instant startsAt = startInstant.plus(delay);
scrapingTasks.add(taskScheduler.scheduleAtFixedRate(
() -> scrapeEndpoint(url, prometheusMetric, destination), startsAt, period));
log.info("Collectors::Prometheus2: Added monitoring task: prometheus-metric={}, destination={}, url={}, starts-at={}, period={}",
prometheusMetric, destination, url, startsAt, period);
} else
log.warn("Collectors::Prometheus2: applyNewConfigurations: Skipped sensor: {}", config);
});
log.debug("Collectors::Prometheus2: applyNewConfigurations: Started new scraping tasks: {}", scrapingTasks);
}
private boolean checkConfig(Map<String, Serializable> config) {
List<String> errors = new ArrayList<>();
String push = config.getOrDefault("push", "").toString();
if (! "false".equalsIgnoreCase(push)) errors.add(String.format("Not a Pull sensor. Expected '%s' but found '%s'", false, push));
String destination = config.getOrDefault("name", "").toString();
if (StringUtils.isBlank(destination)) errors.add("No destination (name) provided");
if (config.get("configuration") instanceof Map configMap) {
String type = configMap.getOrDefault("type", "").toString();
if (! getName().equalsIgnoreCase(type)) errors.add(String.format("Type mismatch. Expected '%s' but found '%s'", getName(), type));
int port = Integer.parseInt( configMap.getOrDefault("port", "0").toString() );
if (port<=0) errors.add("No or invalid port provided: "+port);
String prometheusMetric = configMap.getOrDefault("metric", "").toString();
if (StringUtils.isBlank(prometheusMetric)) errors.add("No prometheus metric provided");
} else
errors.add("No 'configuration' sub-map found in sensor spec: "+config);
// If no errors found return true
if (errors.isEmpty()) return true;
// Print errors and return false
log.warn("Collectors::Prometheus2: checkConfig: Sensor specification has errors: spec={}, errors={}", config, errors);
return false;
}
private String getPrometheusMetric(Map<String, Serializable> config) {
if (config.get("configuration") instanceof Map configMap) {
String prometheusMetric = configMap.getOrDefault("metric", "").toString();
if (StringUtils.isNotBlank(prometheusMetric))
return prometheusMetric;
}
return null;
}
private String getUrlPattern(Map<String, Serializable> config) {
if (config.get("configuration") instanceof Map configMap) {
int port = Integer.parseInt( configMap.getOrDefault("port", "0").toString() );
String endpoint = configMap.getOrDefault("endpoint", "/").toString();
return "http://%s:"+port+endpoint;
}
return null;
}
private Duration getDelay(Map<String, Serializable> config) {
if (config.get("configuration") instanceof Map configMap) {
long delay = Long.parseLong(configMap.getOrDefault("delay", "0").toString());
if (delay>=0)
return Duration.ofSeconds(delay);
}
return Duration.ofSeconds(0);
}
private Duration getInterval(Map<String, Serializable> config) {
if (config.get("interval") instanceof Map intervalMap) {
long period = Long.parseLong(intervalMap.getOrDefault("period", "60").toString());
ChronoUnit unit = ChronoUnit.valueOf(intervalMap.getOrDefault("unit", "SECONDS").toString().toUpperCase());
if (period>0)
return Duration.of(period, unit);
}
return Duration.ofSeconds(60);
}
private void initRestClientAndParser() {
// Initialize the REST client
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
if (properties.getConnectTimeout()>=0)
factory.setConnectTimeout(properties.getConnectTimeout());
if (properties.getReadTimeout()>=0)
factory.setReadTimeout(properties.getReadTimeout());
this.restClient = RestClient.builder()
.requestFactory(factory)
.build();
// Initialize the OpenMetrics parser
this.openMetricsParser = new OpenMetricsParser();
}
private void scrapeEndpoint(String urlPattern, String prometheusMetric, String destination) {
log.debug("Collectors::Prometheus2: scrapeEndpoint: BEGIN: Scraping Prometheus endpoints for sensor: url-pattern={}, prometheusMetric={}, destination={}",
urlPattern, prometheusMetric, destination);
// Get nodes/pods to scrape
Set<Serializable> nodes = collectorContext.getNodesWithoutClient();
log.trace("Collectors::Prometheus2: scrapeEndpoint: Nodes to scrape: {}", nodes);
if (nodes==null || nodes.isEmpty()) {
log.debug("Collectors::Prometheus2: scrapeEndpoint: END: No nodes to scrape: url-pattern={}, prometheusMetric={}, destination={}",
urlPattern, prometheusMetric, destination);
return;
}
// Scrape nodes and process responses
nodes.forEach(node -> {
String url = urlPattern.formatted(node);
log.trace("Collectors::Prometheus2: scrapeEndpoint: Scraping node: {} -- Endpoint: {}", node, url);
// Scrape endpoint
String payload = restClient
.get().uri(url)
.retrieve()
.body(String.class);
log.debug("Collectors::Prometheus2: scrapeEndpoint: Scrapped node: {} -- Endpoint: {} -- Payload:\n{}", node, url, payload);
// Parser response
List<OpenMetricsParser.MetricInstance> results = null;
if (StringUtils.isNotBlank(payload)) {
results = openMetricsParser.processInput(payload.split("\n"));
log.trace("Collectors::Prometheus2: scrapeEndpoint: Parsed payload: {} -- Metrics:\n{}", node, results);
}
// Get values for the requested metric
if (results!=null) {
List<OpenMetricsParser.MetricInstance> matches = results.stream()
.filter(m -> m.getMetricName().equalsIgnoreCase(prometheusMetric)).toList();
log.trace("Collectors::Prometheus2: scrapeEndpoint: Found metric: {} -- Metric(s):\n{}", node, matches);
List<Double> values = matches.stream().map(OpenMetricsParser.MetricInstance::getMetricValue).toList();
log.trace("Collectors::Prometheus2: scrapeEndpoint: Metric value(s): {} -- Value(s):\n{}", node, values);
// Publish extracted values
queueForPublish(prometheusMetric, destination, values, node, url);
}
log.trace("Collectors::Prometheus2: scrapeEndpoint: Done scraping node: {} -- Endpoint: {}", node, url);
});
log.debug("Collectors::Prometheus2: scrapeEndpoint: END");
}
private void queueForPublish(String prometheusMetric, String destination, List<Double> values, Serializable node, String endpoint) {
log.debug("Collectors::Prometheus2: queueForPublish: metric={}, destination={}, values={}, node={}, endpoint={}",
node, prometheusMetric, destination, values, endpoint);
values.forEach(v -> {
EventMap event = new EventMap(v);
event.setEventProperty("metric", prometheusMetric);
event.setEventProperty("source-node", node);
event.setEventProperty("source-endpoint", endpoint);
event.setEventProperty("destination-topic", destination);
eventsQueue.add(event);
});
}
private void startEventPublishTask() {
Thread thread = new Thread(() -> {
while (true) {
try {
EventMap event = eventsQueue.take();
String destination = event.getEventProperty("destination-topic").toString();
CollectorContext.PUBLISH_RESULT result = collectorContext
.sendEvent(null, destination, event, properties.isCreateTopic());
log.debug("Collectors::Prometheus2: Event Publishing: Published event: {} -- Result: {}", event, result);
} catch (InterruptedException e) {
log.warn("Collectors::Prometheus2: Event Publishing: Interrupted. Exiting event publish loop");
break;
}
}
});
thread.setName("PrometheusCollector2-event-publish-thread");
thread.setDaemon(true);
thread.start();
}
}

View File

@ -9,6 +9,9 @@
package gr.iccs.imu.ems.baguette.server;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import gr.iccs.imu.ems.baguette.server.properties.BaguetteServerProperties;
import gr.iccs.imu.ems.brokercep.BrokerCepService;
import gr.iccs.imu.ems.common.recovery.RecoveryConstant;
@ -28,6 +31,7 @@ import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Instant;
@ -50,6 +54,7 @@ public class BaguetteServer implements InitializingBean, EventBus.EventConsumer<
private final SelfHealingManager<NodeRegistryEntry> selfHealingManager;
private final TaskScheduler taskScheduler;
private final ConfigWriteService configWriteService;
private final ObjectMapper objectMapper;
private Sshd server;
@ -410,6 +415,24 @@ public class BaguetteServer implements InitializingBean, EventBus.EventConsumer<
return entry;
}
public ClientConfiguration getClientConfiguration(ClientShellCommand c) throws JsonProcessingException {
String collectorConfigsStr = ConfigWriteService.getInstance()
.getOrCreateConfigFile(
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE,
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FORMAT)
.get(EmsConstant.COLLECTOR_CONFIGURATIONS_VAR);
log.trace("getClientConfiguration: collectorConfigsStr: {}", collectorConfigsStr);
Map<String, List<Map<String, Serializable>>> collectorConfigs = Map.of();
if (StringUtils.isNotBlank(collectorConfigsStr)) {
collectorConfigs = objectMapper.readValue(collectorConfigsStr, new TypeReference<Map<String, List<Map<String, Serializable>>>>() {});
}
log.debug("getClientConfiguration: collectorConfigsStr: {}", collectorConfigs);
return ClientConfiguration.builder()
.nodesWithoutClient(Set.of())
.collectorConfigurations(collectorConfigs)
.build();
}
public List<String> getNodesWithoutClient() {
return createClientList(new HashSet<>(Collections.singletonList(NodeRegistryEntry.STATE.NOT_INSTALLED)));
}

View File

@ -112,6 +112,8 @@ public class ClientShellCommand implements Command, Runnable, ServerSessionAware
@Getter
private Map<String, Object> clientStatistics;
@Getter
private ClientConfiguration clientConfiguration;
public ClientShellCommand(ServerCoordinator coordinator, boolean allowClientOverrideItsAddress, EventBus<String,Object,Object> eventBus, NodeRegistry registry) {
synchronized (LOCK) {
@ -266,6 +268,10 @@ public class ClientShellCommand implements Command, Runnable, ServerSessionAware
helloReceived = true;
getClientInfoFromGreeting(line.substring("-HELLO FROM CLIENT:".length()));
// Send client configuration
clientConfiguration = nodeRegistryEntry.getBaguetteServer().getClientConfiguration(this);
sendClientConfiguration(clientConfiguration);
// Register CSC to Coordinator
coordinator.register(this);
eventBus.send("BAGUETTE_SERVER_CLIENT_REGISTERED", this);
@ -680,6 +686,10 @@ public class ClientShellCommand implements Command, Runnable, ServerSessionAware
}
}
public void sendClientConfiguration() {
sendClientConfiguration( getClientConfiguration() );
}
public void sendClientConfiguration(ClientConfiguration cc) {
log.debug("sendClientConfiguration: id={}, client-config={}", id, cc);
try {

View File

@ -70,6 +70,12 @@ if [[ -f "${JARS_DIR}/control-service.jar" ]]; then
ESPER_PATH="${JARS_DIR}/esper-7.1.0.jar,"
fi
# Download metric model
if [[ "${METRIC_MODEL_URL}" != "" ]]; then
echo "Downloading metric model from URL: ${METRIC_MODEL_URL}"
curl -o "${BASEDIR}/models/$(basename ${METRIC_MODEL_URL%\?*})" -m 60 ${METRIC_MODEL_URL}
fi
java -version
echo "LANG=$LANG"
#locale

View File

@ -237,10 +237,11 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App
boolean logBrokerMessagesFull = properties.isLogBrokerMessagesFull();
if (!logBrokerMessages) return;
long timestamp = System.currentTimeMillis();
try {
// Check if message passed is null
if (message==null) {
log.warn("\n==========| **NULL** MESSAGE RECEIVED");
log.warn("\n==========| **NULL** MESSAGE RECEIVED (timestamp={})", timestamp);
return;
}
@ -255,16 +256,16 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App
// Log message data
if (logBrokerMessagesFull)
log.info("\n==========| RECEIVED A MESSAGE: metricValue={}, dest={}, id={}\n{}", metricValue, jmsDest, jmsMesgId, message);
log.info("\n==========| RECEIVED A MESSAGE: metricValue={}, dest={}, timestamp={}, id={}\n{}", metricValue, jmsDest, timestamp, jmsMesgId, message);
else
log.info("\n==========| RECEIVED A MESSAGE: metricValue={}, dest={}, id={}", metricValue, jmsDest, jmsMesgId);
log.info("\n==========| RECEIVED A MESSAGE: metricValue={}, dest={}, timestamp={}, id={}", metricValue, jmsDest, timestamp, jmsMesgId);
} catch (Exception e) {
// Log error
if (logBrokerMessagesFull)
log.warn("\n==========| RECEIVED A MESSAGE: FAILED TO PARSE. SEE NEXT FOR STACKTRACE\n{}\n\nSTACKTRACE:\n", message, e);
log.warn("\n==========| RECEIVED A MESSAGE: FAILED TO PARSE. SEE NEXT FOR STACKTRACE (timestamp={})\n{}\n\nSTACKTRACE:\n", timestamp, message, e);
else
log.warn("\n==========| RECEIVED A MESSAGE: FAILED TO PARSE. SEE NEXT FOR STACKTRACE\n\nSTACKTRACE:\n", e);
log.warn("\n==========| RECEIVED A MESSAGE: FAILED TO PARSE. SEE NEXT FOR STACKTRACE (timestamp={})\n\nSTACKTRACE:\n", timestamp, e);
}
}

View File

@ -60,6 +60,7 @@ public abstract class AbstractEndpointCollector<T> implements InitializingBean,
protected final Map<Class<? extends AbstractEndpointCollector<T>>, Map<String, String>> nodeToNodeEventsMap = new HashMap<>();
protected boolean started;
protected boolean autoStartRunner = true;
protected ScheduledFuture<?> runner;
protected Set<String> allowedTopics;
protected Map<String, Set<String>> topicMap;
@ -119,7 +120,8 @@ public abstract class AbstractEndpointCollector<T> implements InitializingBean,
// Schedule collection execution
errorsMap.clear();
ignoredNodes.clear();
runner = taskScheduler.scheduleWithFixedDelay(this, Duration.ofMillis(properties.getDelay()));
if (autoStartRunner)
runner = taskScheduler.scheduleWithFixedDelay(this, Duration.ofMillis(properties.getDelay()));
started = true;
log.info("Collectors::{}: Started", collectorId);
@ -138,8 +140,10 @@ public abstract class AbstractEndpointCollector<T> implements InitializingBean,
// Cancel collection execution
started = false;
runner.cancel(true);
runner = null;
if (runner!=null && ! runner.isDone()) {
runner.cancel(true);
runner = null;
}
ignoredNodes.values().stream().filter(Objects::nonNull).forEach(task -> task.cancel(true));
log.info("Collectors::{}: Stopped", collectorId);
}
@ -191,7 +195,7 @@ public abstract class AbstractEndpointCollector<T> implements InitializingBean,
log.trace("Collectors::{}: Nodes without clients in Zone: {}", collectorId, collectorContext.getNodesWithoutClient());
log.trace("Collectors::{}: Is Aggregator: {}", collectorId, collectorContext.isAggregator());
if (collectorContext.isAggregator()) {
if (! collectorContext.getNodesWithoutClient().isEmpty()) {
if (collectorContext.getNodesWithoutClient()!=null && ! collectorContext.getNodesWithoutClient().isEmpty()) {
log.debug("Collectors::{}: Collecting metrics from remote nodes (without EMS client): {}", collectorId,
collectorContext.getNodesWithoutClient());
for (Object nodeAddress : collectorContext.getNodesWithoutClient()) {
@ -330,6 +334,9 @@ public abstract class AbstractEndpointCollector<T> implements InitializingBean,
long callEndTm = System.currentTimeMillis();
log.trace("Collectors::{}: ...response: {}", collectorId, response);
if (response==null) {
log.warn("Collectors::{}: Collecting data...No response: {}", collectorId, null);
} else
if (response.getStatusCode()==HttpStatus.OK) {
T data = response.getBody();
ProcessingStats stats = new ProcessingStats();

View File

@ -86,7 +86,7 @@ public class PrometheusCollector extends AbstractEndpointCollector<String> {
// Add tags into event properties and/or payload
Map<String, String> tags = instance.getTags();
if (tags != null) {
if (allowedTags != null && allowedTags.size() > 0) {
if (allowedTags != null && ! allowedTags.isEmpty()) {
tags.keySet().retainAll(allowedTags);
}
@ -107,7 +107,7 @@ public class PrometheusCollector extends AbstractEndpointCollector<String> {
log.debug("Collectors::{}: Publishing event to destination: {}", collectorId, destination);
updateStats(publishMetricEvent(destination, event, nodeAddress), stats);
} else
if (allowTagsInDestinationName && tags!=null && tags.size()>0) {
if (allowTagsInDestinationName && tags!=null && ! tags.isEmpty()) {
tags.forEach((name,value) -> {
String d = destination.replace("${"+name+"}", value);
log.debug("Collectors::{}: Publishing event to tagged destination: {}", collectorId, d);

View File

@ -32,6 +32,9 @@ public class PrometheusCollectorProperties extends AbstractEndpointCollectorProp
private boolean addTagsInEventPayload;
private boolean throwExceptionWhenExcessiveCharsOccur;
private int connectTimeout = 1000;
private int readTimeout = 1000;
public PrometheusCollectorProperties() {
setUrl("http://127.0.0.1:9090/metrics");
setUrlOfNodesWithoutClient("http://%s:9090/metrics");

View File

@ -10,10 +10,7 @@
package gr.iccs.imu.ems.common.k8s;
import gr.iccs.imu.ems.util.PasswordUtil;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.api.model.apps.DaemonSet;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
@ -25,7 +22,6 @@ import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StreamUtils;
import java.io.ByteArrayInputStream;
@ -37,8 +33,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* EMS Kubernetes (K8S) client
@ -179,6 +174,62 @@ public class K8sClient implements Closeable {
);
}
public List<Node> getNodesInfo() {
log.debug("K8sClient.getNodesInfo: BEGIN");
List<Node> nodesList = null;
try {
nodesList = client.nodes()
.resources()
.map(Resource::item)
.toList();
} catch (Exception e) {
log.warn("K8sClient.getNodesInfo: EXCEPTION while retrieving nodes: ", e);
}
log.debug("K8sClient.getNodesInfo: END: {}", nodesList);
return nodesList;
}
public List<Pod> getRunningPodsInfo(String namespace) throws IOException {
log.debug("K8sClient.getRunningPodsInfo: BEGIN");
if (StringUtils.isBlank(namespace))
namespace = getNamespace();
List<Pod> podsList = null;
try {
podsList = client.pods()
.inNamespace(namespace)
.resources()
.map(Resource::item)
.filter(pod -> "Running".equalsIgnoreCase(pod.getStatus().getPhase()))
.toList();
} catch (Exception e) {
log.warn("K8sClient.getRunningPodsInfo: EXCEPTION while retrieving pods: ", e);
}
log.debug("K8sClient.getRunningPodsInfo: END: {}", podsList);
return podsList;
}
public List<Pod> getRunningPodsInfo() {
log.debug("K8sClient.getRunningPodsInfo: BEGIN");
List<Pod> podsList = null;
try {
podsList = client.pods()
.inAnyNamespace()
.resources()
.map(Resource::item)
.filter(pod -> "Running".equalsIgnoreCase(pod.getStatus().getPhase()))
.toList();
} catch (Exception e) {
log.warn("K8sClient.getRunningPodsInfo: EXCEPTION while retrieving pods: ", e);
}
log.debug("K8sClient.getRunningPodsInfo: END: {}", podsList);
return podsList;
}
private String getNamespace() throws IOException {
String serviceAccountPath = getConfig("K8S_SERVICE_ACCOUNT_SECRETS_PATH", K8S_SERVICE_ACCOUNT_SECRETS_PATH_DEFAULT);
return Files.readString(Paths.get(serviceAccountPath, "namespace"));
}
private <T>List<T> emptyIfNull(List<T> list) {
return list!=null ? list : List.of();
}
@ -187,4 +238,34 @@ public class K8sClient implements Closeable {
public void close() throws IOException {
client.close();
}
// ------------------------------------------------------------------------
@Data
public static class NodeEntry {
private final String nodeUid;
private final String nodeName;
public NodeEntry(Node node) {
nodeUid = node.getMetadata().getUid();
nodeName = node.getMetadata().getName();
}
}
@Data
public static class PodEntry {
private final String podUid;
private final String podIP;
private final String podName;
private final String hostIP;
private final Map<String, String> labels;
public PodEntry(Pod pod) {
podUid = pod.getMetadata().getUid();
podIP = pod.getStatus().getPodIP();
podName = pod.getMetadata().getName();
hostIP = pod.getStatus().getHostIP();
labels = Collections.unmodifiableMap(pod.getMetadata().getLabels());
}
}
}

View File

@ -83,8 +83,6 @@ server-password: ${BAGUETTE_SERVER_PASSWORD}
#collector-classes: gr.iccs.imu.ems.baguette.client.collector.netdata.NetdataCollector
collector-configurations: ${COLLECTOR_CONFIGURATIONS}
collector:
netdata:
enable: true
@ -114,6 +112,8 @@ collector:
#addTagsInEventPayload: true
#throwExceptionWhenExcessiveCharsOccur: true
#collector-configurations: ${COLLECTOR_CONFIGURATIONS}
# -----------------------------------------------------------------------------
# Cluster settings
# -----------------------------------------------------------------------------
@ -164,6 +164,7 @@ cluster:
BROKER_URL_PROPERTIES: transport.daemon=true&transport.trace=false&transport.useKeepAlive=true&transport.useInactivityMonitor=false&transport.needClientAuth=${CLIENT_AUTH_REQUIRED}&transport.verifyHostName=true&transport.connectionTimeout=0&transport.keepAlive=true
CLIENT_AUTH_REQUIRED: false
CLIENT_URL_PROPERTIES: daemon=true&trace=false&useInactivityMonitor=false&connectionTimeout=0&keepAlive=true
#BROKER_URL_ADDRESS_INSECURE: 0.0.0.0
brokercep:
# Broker ports and protocol

View File

@ -549,8 +549,9 @@ baguette.client.install.sessionRecordingDir = ${LOGS_DIR:${EMS_CONFIG_DIR}/../lo
#baguette.client.install.clientInstallErrorIfVarIsMissing=true
baguette.client.install.installationContextProcessorPlugins=\
plugin.install.gr.iccs.imu.ems.baguette.client.AllowedTopicsProcessorPlugin, \
plugin.install.gr.iccs.imu.ems.baguette.client.PrometheusProcessorPlugin
gr.iccs.imu.ems.baguette.client.install.plugin.AllowedTopicsProcessorPlugin, \
gr.iccs.imu.ems.baguette.client.install.plugin.CollectorConfigurationsProcessorPlugin, \
gr.iccs.imu.ems.baguette.client.install.plugin.PrometheusProcessorPlugin
baguette.client.install.clientInstallationReportNodeInfoPatterns[0] = ^CPU_.+($|[\s:=])
baguette.client.install.clientInstallationReportNodeInfoPatterns[1] = ^RAM_($|[\s:=])

View File

@ -539,6 +539,12 @@ baguette.client.install:
### -----------------------------------------
### Instruction Set file processing settings
# installerType: JS_INSTALLER
# instructions:
# LINUX:
# # JS installation scripts
# - file:${EMS_CONFIG_DIR}/baguette-client-install/sample.js
instructions:
LINUX: &install_on_linux
# Instructions set files - JSON version
@ -601,6 +607,7 @@ baguette.client.install:
installationContextProcessorPlugins:
- gr.iccs.imu.ems.baguette.client.install.plugin.AllowedTopicsProcessorPlugin
- gr.iccs.imu.ems.baguette.client.install.plugin.CollectorConfigurationsProcessorPlugin
- gr.iccs.imu.ems.baguette.client.install.plugin.PrometheusProcessorPlugin
clientInstallationReportNodeInfoPatterns:

View File

@ -98,6 +98,14 @@ public class DAG {
}
}
public DAGNode getNodeByElement(NamedElement elem) {
return _namedElementToNodesMapping.get(elem);
}
public DAGNode getNodeByElementName(String elemName) {
return _nameToNodesMapping.get(elemName);
}
// ====================================================================================================================================================
// Add node methods

View File

@ -12,6 +12,8 @@ package gr.iccs.imu.ems.util;
import lombok.*;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -23,4 +25,5 @@ import java.util.Set;
@AllArgsConstructor
public class ClientConfiguration implements Serializable {
@NonNull private Set<Serializable> nodesWithoutClient;
private Map<String, List<Map<String,Serializable>>> collectorConfigurations; // Collector Type - List of sensors' configuration Maps
}

View File

@ -70,7 +70,7 @@ public class ConfigWriteService {
public boolean removeConfigFile(@NonNull String fileName, boolean alsoRemoveFile) {
Configuration c = configurations.remove(fileName);
if (c!=null) {
if (alsoRemoveFile && c!=null) {
if (! c.getConfigPath().toFile().delete()) {
log.warn("removeConfigFile: Failed to remove config. file from the disk: {}", c.getConfigPath());
}
@ -88,6 +88,14 @@ public class ConfigWriteService {
private final Format format;
private final Map<String,String> contentMap = new LinkedHashMap<>();
public String get(@NonNull String key) {
return contentMap.get(key);
}
public String getOrDefault(@NonNull String key, String defaultValue) {
return contentMap.getOrDefault(key, defaultValue);
}
public Configuration put(@NonNull String key, String value) throws IOException {
contentMap.put(key, value);
write();

View File

@ -0,0 +1,41 @@
/*
* Copyright (C) 2023-2025 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.nebulous.ems;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Data
@Configuration
@ConfigurationProperties
public class EmsNebulousProperties implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
log.info("{}", this);
}
private String applicationId;
private String appId;
private String appUid;
private String appUuid;
private String emsServerPodUid;
private String emsServerPodNamespace;
private String appPodLabel = "app";
public String getApplicationId() {
return StringUtils.firstNonBlank(applicationId, appId, appUid, appUuid);
}
}

View File

@ -127,12 +127,12 @@ public class IndexService {
public String getAppMetricModel(@NonNull String appId) throws IOException {
String fileName = getAppData(appId).get(ModelsService.MODEL_FILE_KEY);
return applicationContext.getBean(ModelsService.class).readModel(fileName);
return applicationContext.getBean(ModelsService.class).readFromFile(fileName);
}
public Map<String,String> getAppBindings(@NonNull String appId) throws IOException {
String fileName = getAppData(appId).get(ModelsService.BINDINGS_FILE_KEY);
String bindingsStr = applicationContext.getBean(ModelsService.class).readModel(fileName);
String bindingsStr = applicationContext.getBean(ModelsService.class).readFromFile(fileName);
return objectMapper.readValue(bindingsStr, Map.class);
}

View File

@ -85,7 +85,7 @@ public class ModelsService implements InitializingBean {
// Store bindings in models store
String bindingsFile = getFileName("bindings", appId, "json");
if (bindingsMap==null) bindingsMap = Map.of();
storeModel(bindingsFile, objectMapper.writeValueAsString(bindingsMap));
storeToFile(bindingsFile, objectMapper.writeValueAsString(bindingsMap));
log.info("Stored bindings in file: app-id={}, file={}", appId, bindingsFile);
// Add appId-modelFile entry in the stored Index
@ -135,7 +135,7 @@ public class ModelsService implements InitializingBean {
if (StringUtils.isNotBlank(modelStr)) {
// Store metric model in a new file
modelFile = StringUtils.isBlank(modelFile) ? getFileName("model", appId, "yml") : modelFile;
storeModel(modelFile, modelStr);
storeToFile(modelFile, modelStr);
log.info("Stored metric model in file: app-id={}, file={}", appId, modelFile);
// Add appId-modelFile entry in the stored Index
@ -158,16 +158,16 @@ public class ModelsService implements InitializingBean {
: String.format("%s--%d.%s", type, System.currentTimeMillis(), suffix);
}
String readModel(String fileName) throws IOException {
String readFromFile(String fileName) throws IOException {
Path path = Paths.get(properties.getModelsDir(), fileName);
String modelStr = Files.readString(path);
log.info("Loaded metric model from file: {}", path);
log.debug("Read from file: {}", path);
return modelStr;
}
private void storeModel(String fileName, String modelStr) throws IOException {
private void storeToFile(String fileName, String modelStr) throws IOException {
Path path = Paths.get(properties.getModelsDir(), fileName);
Files.writeString(path, modelStr);
log.info("Stored metric model in file: {}", path);
log.debug("Wrote to file: {}", path);
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright (C) 2023-2025 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.nebulous.ems.k8s;
import eu.nebulous.ems.EmsNebulousProperties;
import gr.iccs.imu.ems.control.controller.NodeRegistrationCoordinator;
import gr.iccs.imu.ems.control.plugin.PostTranslationPlugin;
import gr.iccs.imu.ems.control.util.TopicBeacon;
import gr.iccs.imu.ems.translate.TranslationContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Service
@RequiredArgsConstructor
public class K8sEmsClientDeploymentPostTranslationPlugin implements PostTranslationPlugin, InitializingBean {
private final K8sServiceProperties properties;
private final ApplicationContext applicationContext;
private final EmsNebulousProperties emsNebulousProperties;
private String applicationId;
@Override
public void afterPropertiesSet() throws Exception {
applicationId = emsNebulousProperties.getApplicationId();
log.info("K8sEmsClientDeploymentPostTranslationPlugin: Application Id (from Env.): {}", applicationId);
}
@Override
public void processTranslationResults(TranslationContext translationContext, TopicBeacon topicBeacon) {
String oldAppId = this.applicationId;
this.applicationId = translationContext.getAppId();
log.info("K8sEmsClientDeploymentPostTranslationPlugin: Set applicationId to: {} -- was: {}", applicationId, oldAppId);
// Call control-service to deploy EMS clients
if (properties.isDeployEmsClientsOnKubernetesEnabled()) {
try {
log.info("K8sEmsClientDeploymentPostTranslationPlugin: Start deploying EMS clients...");
String id = "dummy-" + System.currentTimeMillis();
Map<String, Object> nodeInfo = new HashMap<>(Map.of(
"id", id,
"name", id,
"type", "K8S",
"provider", "Kubernetes",
"zone-id", ""
));
applicationContext.getBean(NodeRegistrationCoordinator.class)
.registerNode("", nodeInfo, translationContext);
log.debug("K8sEmsClientDeploymentPostTranslationPlugin: EMS clients deployment started");
} catch (Exception e) {
log.warn("K8sEmsClientDeploymentPostTranslationPlugin: EXCEPTION while starting EMS client deployment: ", e);
}
} else
log.info("K8sEmsClientDeploymentPostTranslationPlugin: EMS clients deployment is disabled");
}
}

View File

@ -0,0 +1,137 @@
/*
* Copyright (C) 2017-2025 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.nebulous.ems.k8s;
import eu.nebulous.ems.EmsNebulousProperties;
import gr.iccs.imu.ems.baguette.server.ClientShellCommand;
import gr.iccs.imu.ems.common.k8s.K8sClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
/**
* Kubernetes cluster pods watcher service
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class K8sPodWatcher implements InitializingBean {
private final K8sServiceProperties properties;
private final TaskScheduler taskScheduler;
private final EmsNebulousProperties emsNebulousProperties;
private String EMS_SERVER_POD_UID;
private String APP_POD_LABEL;
@Override
public void afterPropertiesSet() throws Exception {
boolean envVarEnable = Boolean.parseBoolean(K8sClient.getConfig("K8S_WATCHER_ENABLED", "false"));
if (properties.isEnabled() || envVarEnable) {
Instant initDelay = Instant.now().plus(properties.getInitDelay());
Duration period = properties.getPeriod();
taskScheduler.scheduleAtFixedRate(this::doWatch, initDelay, period);
log.info("K8sPodWatcher: Enabled (running every {}sec, init-delay={})", period, properties.getInitDelay());
} else {
log.info("K8sPodWatcher: Disabled (to enable set 'k8s-watcher.enable' property or K8S_WATCHER_ENABLED env. var. to true)");
}
EMS_SERVER_POD_UID = StringUtils.defaultIfBlank(
emsNebulousProperties.getEmsServerPodUid(), "");
APP_POD_LABEL = emsNebulousProperties.getAppPodLabel();
}
private void doWatch() {
try {
// Get running pods and apply exclusions
log.debug("K8sPodWatcher: BEGIN: Retrieving active Kubernetes cluster pods");
HashMap<String, K8sClient.PodEntry> uuidToPodsMap = new HashMap<>();
HashMap<String, Set<K8sClient.PodEntry>> podsPerHost = new HashMap<>();
try (K8sClient client = K8sClient.create()) {
client.getRunningPodsInfo().forEach(pod -> {
String ns = pod.getMetadata().getNamespace();
String appLabelValue = pod.getMetadata().getLabels().get(APP_POD_LABEL);
log.trace("K8sPodWatcher: Got pod: uid={}, name={}, address={}, namespace={}, app-label={}",
pod.getMetadata().getUid(), pod.getMetadata().getName(), pod.getStatus().getPodIP(),
ns, appLabelValue);
if (properties.getIgnorePodsInNamespaces().contains(ns))
return;
if (StringUtils.isNotBlank(appLabelValue) && properties.getIgnorePodsWithAppLabel().contains(appLabelValue))
return;
K8sClient.PodEntry entry = new K8sClient.PodEntry(pod);
uuidToPodsMap.put(pod.getMetadata().getUid(), entry);
String podHostIp = pod.getStatus().getHostIP();
podsPerHost.computeIfAbsent(podHostIp, s -> new HashSet<>()).add(entry);
log.trace("K8sPodWatcher: Keeping pod: uid={}, name={}, address={}",
pod.getMetadata().getUid(), pod.getMetadata().getName(), pod.getStatus().getPodIP());
});
} // End of try-with-resources
log.debug("K8sPodWatcher: Active Kubernetes cluster pods: uuidToPodsMap: {}", uuidToPodsMap);
log.debug("K8sPodWatcher: Active Kubernetes cluster pods: podsPerHost: {}", podsPerHost);
// Group running pods per host IP
log.debug("K8sPodWatcher: Processing active pods per active EMS client: ems-clients: {}", ClientShellCommand.getActive());
Map<ClientShellCommand,List<K8sClient.PodEntry>> emsClientPodLists = new HashMap<>();
ClientShellCommand.getActive().forEach(csc -> {
//String id = csc.getId();
String emsClientPodUuid = csc.getClientId();
String address = csc.getClientIpAddress();
log.trace("K8sPodWatcher: EMS client: pod-uid={}, address={}", emsClientPodUuid, address);
K8sClient.PodEntry emsClientPod = uuidToPodsMap.get(emsClientPodUuid);
log.trace("K8sPodWatcher: EMS client: pod-entry: {}", emsClientPod);
String emsClientPodHostIp = emsClientPod.hostIP();
Set<K8sClient.PodEntry> podsInHost = podsPerHost.get(emsClientPodHostIp);
log.trace("K8sPodWatcher: EMS client: pod-host-address={}, pods-in-host: {}", emsClientPodHostIp, podsInHost);
List<K8sClient.PodEntry> podsInHostWithoutEmsClient = podsInHost.stream()
.filter(pod -> ! pod.podUid().equalsIgnoreCase(EMS_SERVER_POD_UID))
.filter(pod -> ! pod.podUid().equalsIgnoreCase(emsClientPodUuid))
.toList();
log.trace("K8sPodWatcher: EMS client: pod-host-address={}, Filtered-pods-in-host: {}", emsClientPodHostIp, podsInHostWithoutEmsClient);
LinkedList<K8sClient.PodEntry> list = new LinkedList<>();
list.add(emsClientPod);
list.addAll(podsInHostWithoutEmsClient);
emsClientPodLists.put(csc, list);
});
log.debug("K8sPodWatcher: Active Kubernetes cluster pods per EMS client: {}", emsClientPodLists);
// Update EMS client configurations
log.debug("K8sPodWatcher: Updating EMS client configurations with active pods: {}", emsClientPodLists);
emsClientPodLists.forEach((csc, podList) -> {
K8sClient.PodEntry emsClientPod = podList.get(0);
List<K8sClient.PodEntry> otherPods = podList.subList(1, podList.size());
String clientId = csc.getClientId();
String hostIp = emsClientPod.hostIP();
Set<Serializable> oldPodSet = csc.getClientConfiguration().getNodesWithoutClient();
Set<Serializable> newPodSet = otherPods.stream().map(K8sClient.PodEntry::podIP).collect(Collectors.toSet());
log.trace("K8sPodWatcher: EMS client: {} @{} -- Old pod set: {} -- New pod set: {}", clientId, hostIp, oldPodSet, newPodSet);
csc.getClientConfiguration().setNodesWithoutClient(newPodSet);
log.trace("K8sPodWatcher: EMS client: {} @{} -- Sending configuration to EMS client", clientId, hostIp);
csc.sendClientConfiguration();
});
log.debug("K8sPodWatcher: Updated EMS client configurations with active pods");
log.debug("K8sPodWatcher: END");
} catch (Exception e) {
log.warn("K8sPodWatcher: ERROR while running doWatch: ", e);
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright (C) 2023-2025 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.nebulous.ems.k8s;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;
import java.time.Duration;
import java.util.List;
import static gr.iccs.imu.ems.util.EmsConstant.EMS_PROPERTIES_PREFIX;
@Slf4j
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = EMS_PROPERTIES_PREFIX + "k8s")
public class K8sServiceProperties implements InitializingBean {
private boolean enabled = true;
private Duration initDelay = Duration.ofSeconds(30);
private Duration period = Duration.ofSeconds(60);
private boolean deployEmsClientsOnKubernetesEnabled = true;
// Pod filters
private List<String> ignorePodsInNamespaces = List.of(
"kube-node-lease", "kube-public", "kube-system", "local-path-storage");
private List<String> ignorePodsWithAppLabel = List.of(
"netdata");
@Override
public void afterPropertiesSet() {
log.debug("K8sServiceProperties: {}", this);
}
}

View File

@ -9,28 +9,17 @@
package eu.nebulous.ems.plugins;
import gr.iccs.imu.ems.baguette.client.install.ClientInstallationTask;
import gr.iccs.imu.ems.baguette.client.install.plugin.AllowedTopicsProcessorPlugin;
import gr.iccs.imu.ems.baguette.server.NodeRegistryEntry;
import gr.iccs.imu.ems.control.controller.ControlServiceRequestInfo;
import gr.iccs.imu.ems.control.plugin.AppModelPlugin;
import gr.iccs.imu.ems.translate.TranslationContext;
import gr.iccs.imu.ems.util.ConfigWriteService;
import gr.iccs.imu.ems.util.EmsConstant;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Slf4j
@Service
@RequiredArgsConstructor
public class NebulousAppModelPlugin implements AppModelPlugin {
private final AllowedTopicsProcessorPlugin allowedTopicsProcessorPlugin;
private final ConfigWriteService configWriteService;
@Override
public void preProcessingNewAppModel(String appModelId, ControlServiceRequestInfo requestInfo) {
log.debug("NebulousAppModelPlugin: Nothing to do. Args: appModelId={}, requestInfo={}", appModelId, requestInfo);
@ -38,34 +27,6 @@ public class NebulousAppModelPlugin implements AppModelPlugin {
@Override
public void postProcessingNewAppModel(String appModelId, ControlServiceRequestInfo requestInfo, TranslationContext translationContext) {
log.debug("NebulousAppModelPlugin: BEGIN: appModelId={}, requestInfo={}", appModelId, requestInfo);
// Get collector allowed topics
NodeRegistryEntry entry = new NodeRegistryEntry(null, null, null);
ClientInstallationTask task = ClientInstallationTask.builder()
.nodeRegistryEntry(entry)
.translationContext(translationContext)
.build();
allowedTopicsProcessorPlugin.processBeforeInstallation(task, -1);
String allowedTopics = task.getNodeRegistryEntry().getPreregistration().get(EmsConstant.COLLECTOR_ALLOWED_TOPICS_VAR);
log.debug("NebulousAppModelPlugin: collector-allowed-topics: {}", allowedTopics);
if (StringUtils.isBlank(allowedTopics)) {
log.debug("NebulousAppModelPlugin: END: No value for 'collector-allowed-topics' setting: appModelId={}, requestInfo={}", appModelId, requestInfo);
return;
}
// Append collector-allowed-topics in ems-client-configmap file
try {
configWriteService
.getOrCreateConfigFile(
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE,
EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FORMAT)
.put(EmsConstant.COLLECTOR_ALLOWED_TOPICS_VAR, allowedTopics);
log.debug("NebulousAppModelPlugin: END: Updated ems-client-configmap file: {}", EmsConstant.EMS_CLIENT_K8S_CONFIG_MAP_FILE);
} catch (IOException e) {
log.error("NebulousAppModelPlugin: EXCEPTION while updating ems-client-configmap file, during post-processing of new App Model: appModelId={}, requestInfo={}\nException: ",
appModelId, requestInfo, e);
}
log.debug("NebulousAppModelPlugin: Nothing to do. Args: appModelId={}, requestInfo={}", appModelId, requestInfo);
}
}

View File

@ -15,13 +15,12 @@ import gr.iccs.imu.ems.control.util.TopicBeacon;
import gr.iccs.imu.ems.translate.TranslationContext;
import gr.iccs.imu.ems.translate.dag.DAGNode;
import gr.iccs.imu.ems.translate.model.*;
import jakarta.annotation.PostConstruct;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;
@ -142,10 +141,14 @@ public class PredictionsPostTranslationPlugin implements PostTranslationPlugin {
{
String elementName = constraintNode.getName();
String elementClassName = constraintNode.getClass().getName();
if (constraintNode instanceof MetricConstraint) {
MetricConstraint mc = mcMap.get(elementName);
if (constraintNode instanceof MetricConstraint mc) {
//MetricConstraint mc = mcMap.get(elementName);
String metricName = mc.getMetricContext().getName();
String metricTopic = NebulousEmsTranslator.nameNormalization.apply(metricName);
return Map.of(
"name", NebulousEmsTranslator.nameNormalization.apply(mc.getName()),
"metric", metricTopic,
"operator", mc.getComparisonOperator().getOperator(),
"threshold", mc.getThreshold());
} else

View File

@ -8,6 +8,7 @@
package eu.nebulous.ems.service;
import eu.nebulous.ems.EmsNebulousProperties;
import eu.nebulous.ems.translate.NebulousEmsTranslatorProperties;
import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
@ -29,7 +30,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
@ -42,13 +42,14 @@ public class EmsBootInitializer extends AbstractExternalBrokerService implements
private final ApplicationContext applicationContext;
private final EmsBootInitializerProperties bootInitializerProperties;
private final NebulousEmsTranslatorProperties translatorProperties;
private final String appId = System.getenv("APPLICATION_ID");
private final String applicationId;
private final AtomicBoolean processingResponse = new AtomicBoolean(false);
private ScheduledFuture<?> bootFuture;
private Consumer consumer;
private Publisher publisher;
public EmsBootInitializer(ApplicationContext applicationContext,
EmsNebulousProperties emsNebulousProperties,
EmsBootInitializerProperties bootInitializerProperties,
NebulousEmsTranslatorProperties translatorProperties,
ExternalBrokerServiceProperties properties,
@ -58,11 +59,12 @@ public class EmsBootInitializer extends AbstractExternalBrokerService implements
this.applicationContext = applicationContext;
this.bootInitializerProperties = bootInitializerProperties;
this.translatorProperties = translatorProperties;
this.applicationId = emsNebulousProperties.getApplicationId();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
if (StringUtils.isBlank(appId)) {
if (StringUtils.isBlank(applicationId)) {
log.warn("===================> EMS is ready -- Application Id is blank. EMS Boot disabled");
return;
}
@ -74,7 +76,7 @@ public class EmsBootInitializer extends AbstractExternalBrokerService implements
log.warn("===================> EMS is ready -- EMS Boot disabled because External broker service is disabled");
return;
}
log.info("===================> EMS is ready -- Scheduling EMS Boot message -- App Id: {}", appId);
log.info("===================> EMS is ready -- Scheduling EMS Boot message -- App Id: {}", applicationId);
// Start connector used for EMS Booting
startConnector();
@ -99,14 +101,18 @@ public class EmsBootInitializer extends AbstractExternalBrokerService implements
}
protected void sendEmsBootReadyEvent() {
if (publisher==null) {
log.warn("ExternalBrokerPublisherService: EMS Boot message not sent because External broker publisher is null");
return;
}
Map<String, String> message = Map.of(
"application", appId,
"application", applicationId,
// "internal-address", NetUtil.getDefaultIpAddress(),
// "public-address", NetUtil.getPublicIpAddress(),
"address", NetUtil.getIpAddress()
);
log.debug("ExternalBrokerPublisherService: Sending message to EMS Boot: {}", message);
publisher.send(message, null,true);
publisher.send(message, null, true);
log.debug("ExternalBrokerPublisherService: Sent message to EMS Boot");
}

View File

@ -8,52 +8,44 @@
package eu.nebulous.ems.service;
import eu.nebulous.ems.EmsNebulousProperties;
import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.exn.core.Publisher;
import gr.iccs.imu.ems.control.controller.NodeRegistrationCoordinator;
import gr.iccs.imu.ems.control.plugin.PostTranslationPlugin;
import gr.iccs.imu.ems.control.util.TopicBeacon;
import gr.iccs.imu.ems.translate.TranslationContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
@Slf4j
@Service
public class ExternalBrokerListenerService extends AbstractExternalBrokerService
implements PostTranslationPlugin, InitializingBean
{
private final ApplicationContext applicationContext;
public class ExternalBrokerListenerService extends AbstractExternalBrokerService implements InitializingBean {
private final ArrayBlockingQueue<Command> commandQueue = new ArrayBlockingQueue<>(100);
private final EmsNebulousProperties emsNebulousProperties;
private final MvvService mvvService;
private List<Consumer> consumers;
private Publisher commandsResponsePublisher;
private String applicationId = System.getenv("APPLICATION_ID");
private String applicationId;
record Command(String key, String address, Map body, Message message, Context context) {
}
public ExternalBrokerListenerService(ApplicationContext applicationContext,
ExternalBrokerServiceProperties properties,
public ExternalBrokerListenerService(ExternalBrokerServiceProperties properties,
EmsNebulousProperties emsNebulousProperties,
TaskScheduler taskScheduler,
MvvService mvvService)
{
super(properties, taskScheduler);
this.applicationContext = applicationContext;
this.emsNebulousProperties = emsNebulousProperties;
this.mvvService = mvvService;
}
@ -64,10 +56,11 @@ public class ExternalBrokerListenerService extends AbstractExternalBrokerService
return;
}
log.info("ExternalBrokerListenerService: Application Id (from Env.): {}", applicationId);
applicationId = emsNebulousProperties.getApplicationId();
log.info("ExternalBrokerListenerService: Application Id: {}", applicationId);
if (StringUtils.isBlank(applicationId))
log.warn("ExternalBrokerListenerService: APPLICATION_ID env. var. is missing");
//throw new IllegalArgumentException("APPLICATION_ID not provided as an env. var");
//throw new IllegalArgumentException("APPLICATION_UID not provided as an env. var");
if (checkProperties()) {
initializeConsumers();
@ -80,44 +73,7 @@ public class ExternalBrokerListenerService extends AbstractExternalBrokerService
}
}
@Override
public void processTranslationResults(TranslationContext translationContext, TopicBeacon topicBeacon) {
if (!properties.isEnabled()) {
log.info("ExternalBrokerListenerService: Disabled due to configuration");
return;
}
this.applicationId = translationContext.getAppId();
log.info("ExternalBrokerListenerService: Set applicationId to: {}", applicationId);
// Call control-service to deploy EMS clients
if (properties.isDeployEmsClientsOnKubernetesEnabled()) {
try {
log.info("ExternalBrokerListenerService: Start deploying EMS clients...");
String id = "dummy-" + System.currentTimeMillis();
Map<String, Object> nodeInfo = new HashMap<>(Map.of(
"id", id,
"name", id,
"type", "K8S",
"provider", "Kubernetes",
"zone-id", ""
));
applicationContext.getBean(NodeRegistrationCoordinator.class)
.registerNode("", nodeInfo, translationContext);
log.debug("ExternalBrokerListenerService: EMS clients deployment started");
} catch (Exception e) {
log.warn("ExternalBrokerListenerService: EXCEPTION while starting EMS client deployment: ", e);
}
} else
log.info("ExternalBrokerListenerService: EMS clients deployment is disabled");
}
private void initializeConsumers() {
/*if (StringUtils.isBlank(applicationId)) {
log.warn("ExternalBrokerListenerService: Call to initializeConsumers with blank applicationId. Will not change anything.");
return;
}*/
// Create message handler
Handler messageHandler = new Handler() {
@Override
@ -164,17 +120,17 @@ public class ExternalBrokerListenerService extends AbstractExternalBrokerService
}, Instant.now());
}
private void processMessage(Command command) throws ClientException, IOException {
private void processMessage(Command command) throws ClientException {
log.debug("ExternalBrokerListenerService: Command: {}", command);
log.debug("ExternalBrokerListenerService: Command: message: {}", command.message);
log.debug("ExternalBrokerListenerService: Command: body: {}", command.message.body());
command.message.forEachProperty((s, o) ->
log.debug("ExternalBrokerListenerService: Command: --- property: {} = {}", s, o));
if (properties.getCommandsTopic().equals(command.address)) {
/*if (properties.getCommandsTopic().equals(command.address)) {
// Process command
log.info("ExternalBrokerListenerService: Received a command from external broker: {}", command.body);
processCommandMessage(command);
} else
} else*/
if (properties.getSolutionsTopic().equals(command.address)) {
// Process new solution message
log.info("ExternalBrokerListenerService: Received a new Solution message from external broker: {}", command.body);
@ -189,7 +145,7 @@ public class ExternalBrokerListenerService extends AbstractExternalBrokerService
}
}
private void processCommandMessage(Command command) throws ClientException {
/*private void processCommandMessage(Command command) throws ClientException {
// Get application id
String appId = getAppId(command, commandsResponsePublisher);
if (appId == null) return;
@ -230,5 +186,5 @@ public class ExternalBrokerListenerService extends AbstractExternalBrokerService
publisher.send(Map.of(
"response", response
), appId);
}
}*/
}

View File

@ -55,8 +55,6 @@ public class ExternalBrokerServiceProperties implements InitializingBean {
private String emsBootTopic = NEBULOUS_TOPIC_PREFIX + "ems.boot";
private String emsBootResponseTopic = NEBULOUS_TOPIC_PREFIX + "ems.boot.reply";
private boolean deployEmsClientsOnKubernetesEnabled = true;
@Override
public void afterPropertiesSet() {
log.debug("ExternalBrokerServiceProperties: {}", this);

View File

@ -109,6 +109,7 @@ class ConstraintsHelper extends AbstractHelper {
MetricConstraint metricConstraint = MetricConstraint.builder()
.name(constraintNamesKey.name())
.object(constraintSpec)
//.metricContext(....) // See next
.comparisonOperator(ComparisonOperatorType.byOperator(comparisonOperator))
.threshold(threshold)
.build();

View File

@ -143,7 +143,7 @@ class SensorsHelper extends AbstractHelper implements InitializingBean {
false,
String.format(" createPullSensor(): Invalid interval period in configuration: sensor=%s, configuration=%s\n",
sensorName, cfgMapWithStr));
Interval.UnitType periodUnit = StrUtil.strToEnum(intervalUnitStr,
Interval.UnitType periodUnit = StrUtil.strToEnum(intervalUnitStr.toUpperCase(),
Interval.UnitType.class,
schedule!=null ? Interval.UnitType.valueOf(schedule.getTimeUnit().toUpperCase()) : Interval.UnitType.SECONDS,
false,

View File

@ -19,7 +19,7 @@ import java.util.Map;
@Slf4j
@Service
public class NetdataPostProcessorPlugin implements SensorPostProcessorPlugin {
public class NetdataSensorPostProcessorPlugin implements SensorPostProcessorPlugin {
public final static String NETDATA_TYPE = "netdata";
@Override

View File

@ -0,0 +1,38 @@
/*
* Copyright (C) 2023-2025 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.nebulous.ems.translate.plugins;
import gr.iccs.imu.ems.translate.model.PullSensor;
import gr.iccs.imu.ems.translate.model.Sensor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class PrometheusSensorPostProcessorPlugin implements SensorPostProcessorPlugin {
public final static List<String> PROMETHEUS_TYPES = List.of("prometheus");
@Override
public List<String> getSupportedTypes() {
return PROMETHEUS_TYPES;
}
@Override
public void postProcessSensor(Sensor sensor, String sensorType, Map<String, Object> sensorSpec) {
if (sensorType!=null && PROMETHEUS_TYPES.contains(sensorType.toLowerCase())) {
log.trace("PrometheusSensorPostProcessorPlugin: SPEC: {}", sensorSpec);
log.trace("PrometheusSensorPostProcessorPlugin: SENSOR: {}", sensor);
log.trace("PrometheusSensorPostProcessorPlugin: CONFIG: {}", sensor.getConfiguration());
log.trace("PrometheusSensorPostProcessorPlugin: INTERVAL: {}", ((PullSensor)sensor).getInterval());
}
}
}

View File

@ -17,7 +17,7 @@ image:
# pullPolicy: IfNotPresent
pullPolicy: Always
# Overrides the image tag whose default is the chart appVersion.
tag: &image_tag "2024-mar-nebulous"
tag: &image_tag "2024-apr-nebulous"
imagePullSecrets: []
nameOverride: ""