diff --git a/README.md b/README.md index 3ef2148..f9b492f 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ Alarms have three possible states: `UNDETERMINED`, `OK` and `ALARM`. Alarms are avg(cpu{service=nova}, 120) > 90 or avg(load{service=nova}, 120) > 15 ``` +For more details on Alarm Definitions versus Alarms refer to the Monasca API documentation at https://github.com/stackforge/monasca-api/blob/master/docs/monasca-api-spec.md. + If the expression evaluates to true, the Alarm state transitions to `ALARM`, if it evaluates to false, the state transitions to `OK` and if there aren't any metrics for the two times the measuring period, the Alarm state transitions to `UNDETERMINED`. Each part of the expression is represented by a Sub Alarm, so for the above example, there are two Sub Alarms. The Threshold Engine is designed as a series of Storm Spouts and Bolts. For an overview of Storm, look at [the tutorial][storm-tutorial]. Spouts feed external data into the system as messages while bolts process incoming messages and optionally produce output messages for a downstream bolt. diff --git a/thresh/pom.xml b/thresh/pom.xml index 9189839..127bf71 100644 --- a/thresh/pom.xml +++ b/thresh/pom.xml @@ -18,7 +18,7 @@ 1.0.0-SNAPSHOT 0.9.1-incubating - true + false UTF-8 UTF-8 ${project.artifactId}-${project.version}-${timestamp}-${buildNumber} diff --git a/thresh/src/main/java/monasca/thresh/TopologyModule.java b/thresh/src/main/java/monasca/thresh/TopologyModule.java index 004e2cb..5c98fa7 100644 --- a/thresh/src/main/java/monasca/thresh/TopologyModule.java +++ b/thresh/src/main/java/monasca/thresh/TopologyModule.java @@ -121,6 +121,7 @@ public class TopologyModule extends AbstractModule { MetricFilteringBolt.NEW_METRIC_FOR_ALARM_DEFINITION_STREAM, new Fields(AlarmCreationBolt.ALARM_CREATION_FIELDS[3])) .allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID) + .allGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID) .allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID) .setNumTasks(1); // This has to be a single bolt right now because there is no // database protection for adding metrics and dimensions diff --git a/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java b/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java index e70b8ea..08168dd 100644 --- a/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java +++ b/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java @@ -33,8 +33,7 @@ import java.util.UUID; /** * An alarm comprised of sub-alarms. - */ -/** + * * @author craigbr * */ @@ -207,6 +206,16 @@ public class Alarm extends AbstractEntity { } } + public boolean updateSubAlarm(final SubExpression subExpression) { + for (final SubAlarm subAlarm : this.subAlarms.values()) { + if (subAlarm.getAlarmSubExpressionId().equals(subExpression.getId())) { + subAlarm.setExpression(subExpression.getAlarmSubExpression()); + return true; + } + } + return false; + } + @Override public String toString() { final StringBuilder alarmedMetricsString = new StringBuilder(); diff --git a/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java b/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java index 54602f7..8133615 100644 --- a/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java +++ b/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.UUID; /** - * An alarm comprised of sub-alarms. + * Defines the "policy" for creating alarms */ public class AlarmDefinition extends AbstractEntity { private String tenantId; @@ -216,4 +216,14 @@ public class AlarmDefinition extends AbstractEntity { public void setSubExpressions(List subExpressions) { this.subExpressions = subExpressions; } + + public boolean updateSubExpression(final String id, final AlarmSubExpression alarmSubExpression) { + for (final SubExpression subExpression : this.subExpressions) { + if (subExpression.getId().equals(id)) { + subExpression.setAlarmSubExpression(alarmSubExpression); + return true; + } + } + return false; + } } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java b/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java index 5f67494..6ad36f5 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java @@ -20,7 +20,6 @@ package monasca.thresh.infrastructure.persistence; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.MetricDefinition; -import monasca.common.persistence.BeanMapper; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubAlarm; @@ -31,19 +30,15 @@ import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.digest.DigestUtils; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.StatementContext; -import org.skife.jdbi.v2.tweak.ResultSetMapper; +import org.skife.jdbi.v2.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.ResultSet; -import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import javax.inject.Inject; @@ -65,38 +60,98 @@ public class AlarmDAOImpl implements AlarmDAO { @Override public List findForAlarmDefinitionId(String alarmDefinitionId) { - Handle h = db.open(); - try { - List alarms = - h.createQuery("select * from alarm where alarm_definition_id = :id") - .bind("id", alarmDefinitionId).map(new BeanMapper(Alarm.class)).list(); - - for (final Alarm alarm : alarms) { - alarm.setSubAlarms(getSubAlarms(h, alarm.getId())); - - alarm.setAlarmedMetrics(findAlarmedMetrics(h, alarm.getId())); - } - return alarms; - } finally { - h.close(); - } + return findAlarms("a.alarm_definition_id = :alarmDefinitionId ", "alarmDefinitionId", + alarmDefinitionId); } @Override public List listAll() { - Handle h = db.open(); - try { - List alarms = - h.createQuery("select * from alarm").map(new BeanMapper(Alarm.class)).list(); + return findAlarms("1=1"); // This is basically "true" and gets optimized out + } - for (final Alarm alarm : alarms) { - alarm.setSubAlarms(getSubAlarms(h, alarm.getId())); + private List findAlarms(final String additionalWhereClause, String ... params) { + try (final Handle h = db.open()) { - alarm.setAlarmedMetrics(findAlarmedMetrics(h, alarm.getId())); + final String ALARMS_SQL = + "select a.id, a.alarm_definition_id, a.state, sa.id as sub_alarm_id, sa.expression, sa.sub_expression_id, ad.tenant_id from alarm a " + + "inner join sub_alarm sa on sa.alarm_id = a.id " + + "inner join alarm_definition ad on a.alarm_definition_id = ad.id " + + "where ad.deleted_at is null and %s " + + "order by a.id"; + final String sql = String.format(ALARMS_SQL, additionalWhereClause); + final Query> query = h.createQuery(sql); + addQueryParameters(query, params); + final List> rows = query.list(); + + final List alarms = new ArrayList<>(rows.size()); + List subAlarms = new ArrayList(); + String prevAlarmId = null; + Alarm alarm = null; + final Map alarmMap = new HashMap<>(); + final Map tenantIdMap = new HashMap<>(); + for (final Map row : rows) { + final String alarmId = getString(row, "id"); + if (!alarmId.equals(prevAlarmId)) { + if (alarm != null) { + alarm.setSubAlarms(subAlarms); + } + alarm = new Alarm(); + alarm.setId(alarmId); + alarm.setAlarmDefinitionId(getString(row, "alarm_definition_id")); + alarm.setState(AlarmState.valueOf(getString(row, "state"))); + subAlarms = new ArrayList(); + alarms.add(alarm); + alarmMap.put(alarmId, alarm); + tenantIdMap.put(alarmId, getString(row, "tenant_id")); + } + final SubExpression subExpression = + new SubExpression(getString(row, "sub_expression_id"), AlarmSubExpression.of(getString( + row, "expression"))); + final SubAlarm subAlarm = + new SubAlarm(getString(row, "sub_alarm_id"), alarmId, subExpression); + subAlarms.add(subAlarm); + prevAlarmId = alarmId; + } + if (alarm != null) { + alarm.setSubAlarms(subAlarms); + } + if (!alarms.isEmpty()) { + getAlarmedMetrics(h, alarmMap, tenantIdMap, additionalWhereClause, params); } return alarms; - } finally { - h.close(); + } + } + + private void addQueryParameters(final Query> query, String... params) { + for (int i = 0; i < params.length;) { + query.bind(params[i], params[i+1]); + i += 2; + } + } + + private void getAlarmedMetrics(Handle h, final Map alarmMap, + final Map tenantIdMap, final String additionalWhereClause, String ... params) { + final String baseSql = "select a.id, md.name, mdg.dimensions from metric_definition as md " + + "inner join metric_definition_dimensions as mdd on md.id = mdd.metric_definition_id " + + "inner join alarm_metric as am on mdd.id = am.metric_definition_dimensions_id " + + "inner join alarm as a on am.alarm_id = a.id " + + "left join (select dimension_set_id, name, value, group_concat(name, '=', value) as dimensions " + + " from metric_dimension group by dimension_set_id) as mdg on mdg.dimension_set_id = mdd.metric_dimension_set_id where %s"; + final String sql = String.format(baseSql, additionalWhereClause); + final Query> query = h.createQuery(sql); + addQueryParameters(query, params); + final List> metricRows = query.list(); + for (final Map row : metricRows) { + final String alarmId = getString(row, "id"); + final Alarm alarm = alarmMap.get(alarmId); + // This shouldn't happen but it is possible an Alarm gets created after the AlarmDefinition is + // marked deleted and any existing alarms are deleted but before the Threshold Engine gets the + // AlarmDefinitionDeleted message + if (alarm == null) { + continue; + } + final MetricDefinition md = createMetricDefinitionFromRow(row); + alarm.addAlarmedMetric(new MetricDefinitionAndTenantId(md, tenantIdMap.get(alarmId))); } } @@ -208,105 +263,54 @@ public class AlarmDAOImpl implements AlarmDAO { @Override public Alarm findById(String id) { - Handle h = db.open(); - - try { - Alarm alarm = - h.createQuery("select * from alarm where id = :id").bind("id", id) - .map(new BeanMapper(Alarm.class)).first(); - if (alarm == null) { - return null; - } - - alarm.setSubAlarms(getSubAlarms(h, alarm.getId())); - - alarm.setAlarmedMetrics(findAlarmedMetrics(h, id)); - return alarm; - } finally { - h.close(); + final List alarms = findAlarms("a.id = :alarm_id ", "alarm_id", id); + if (alarms.isEmpty()) { + return null; } - } - - private static class SubAlarmMapper implements ResultSetMapper { - public SubAlarm map(int rowIndex, ResultSet rs, StatementContext ctxt) throws SQLException { - SubExpression subExpression = new SubExpression( - rs.getString("sub_expression_id"), AlarmSubExpression.of(rs.getString("expression"))); - return new SubAlarm(rs.getString("id"), rs.getString("alarm_id"), subExpression); - } - } - - private List getSubAlarms(Handle h, String alarmId) { - return h.createQuery("select * from sub_alarm where alarm_id = :alarmId") - .bind("alarmId", alarmId).map(new SubAlarmMapper()).list(); - } - - private Set findAlarmedMetrics(Handle h, String alarmId) { - final List> result = - h.createQuery( - "select md.name as metric_name, md.tenant_id, md.region, mdi.name, mdi.value, mdd.id, mdd.metric_dimension_set_id " + - "from metric_definition_dimensions as mdd left join metric_definition as md on md.id = mdd.metric_definition_id " + - "left join metric_dimension as mdi on mdi.dimension_set_id = mdd.metric_dimension_set_id where mdd.id in " + - "(select metric_definition_dimensions_id from alarm_metric where alarm_id=:alarm_id) order by mdd.id") - .bind("alarm_id", alarmId).list(); - if ((result == null) || result.isEmpty()) { - return new HashSet<>(0); + else { + return alarms.get(0); } - - final Set alarmedMetrics = new HashSet<>(result.size()); - Sha1HashId previous = null; - MetricDefinitionAndTenantId mdtid = null; - for (Map row : result) { - final Sha1HashId next = new Sha1HashId((byte[]) row.get("id")); - // The order by clause in the SQL guarantees this order - if (!next.equals(previous)) { - if (mdtid != null) { - alarmedMetrics.add(mdtid); - } - final String name = (String) row.get("metric_name"); - final String tenantId = (String) row.get("tenant_id"); - mdtid = new MetricDefinitionAndTenantId(new MetricDefinition(name, new HashMap()), tenantId); - previous = next; - } - final String name = (String) row.get("name"); - final String value = (String) row.get("value"); - if ((name != null) && !name.isEmpty()) { - mdtid.metricDefinition.dimensions.put(name, value); - } - } - if (mdtid != null) { - alarmedMetrics.add(mdtid); - } - return alarmedMetrics; } @Override public void updateState(String id, AlarmState state) { - Handle h = db.open(); - try { + try (final Handle h = db.open()) { h.createStatement("update alarm set state = :state, updated_at = NOW() where id = :id") .bind("id", id).bind("state", state.toString()).execute(); - } finally { - h.close(); } } @Override public int updateSubAlarmExpressions(String alarmSubExpressionId, AlarmSubExpression alarmSubExpression) { - Handle h = db.open(); - try { + try (final Handle h = db.open()) { return h .createStatement( "update sub_alarm set expression=:expression where sub_expression_id=:alarmSubExpressionId") .bind("expression", alarmSubExpression.getExpression()) .bind("alarmSubExpressionId", alarmSubExpressionId).execute(); - } finally { - h.close(); } } + private MetricDefinition createMetricDefinitionFromRow(final Map row) { + final Map dimensionMap = new HashMap<>(); + final String dimensions = getString(row, "dimensions"); + if (dimensions != null) { + for (String dimension : dimensions.split(",")) { + final String[] parsed_dimension = dimension.split("="); + dimensionMap.put(parsed_dimension[0], parsed_dimension[1]); + } + } + final MetricDefinition md = new MetricDefinition(getString(row, "name"), dimensionMap); + return md; + } + + private String getString(final Map row, String fieldName) { + return (String) row.get(fieldName); + } + private String trunc(String s, int l) { if (s == null) { diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java index 4b5d245..0bf1816 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java @@ -28,6 +28,8 @@ import backtype.storm.tuple.Values; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.event.AlarmDefinitionDeletedEvent; +import monasca.common.model.event.AlarmDefinitionUpdatedEvent; +import monasca.common.model.event.AlarmDeletedEvent; import monasca.common.model.metric.MetricDefinition; import monasca.common.streaming.storm.Logging; import monasca.common.util.Injector; @@ -35,6 +37,7 @@ import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubAlarm; +import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; @@ -51,8 +54,6 @@ import java.util.Map; /** * Handles creation of Alarms and Alarmed Metrics. - * - * MUST be only one of these bolts in the storm application */ public class AlarmCreationBolt extends BaseRichBolt { private static final long serialVersionUID = 1096706128973976599L; @@ -67,6 +68,8 @@ public class AlarmCreationBolt extends BaseRichBolt { private transient AlarmDAO alarmDAO; private OutputCollector collector; private final Map> waitingAlarms = new HashMap<>(); + private final Map> alarmCache = new HashMap<>(); + private final Map alarmDefinitionCache = new HashMap<>(); private static final List EMPTY_LIST = Collections.emptyList(); public AlarmCreationBolt(DataSourceFactory dbConfig) { @@ -86,7 +89,7 @@ public class AlarmCreationBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { - logger.info("tuple: {}", tuple); + logger.debug("tuple: {}", tuple); try { if (MetricFilteringBolt.NEW_METRIC_FOR_ALARM_DEFINITION_STREAM.equals(tuple.getSourceStreamId())) { final MetricDefinitionAndTenantId metricDefinitionAndTenantId = @@ -96,10 +99,9 @@ public class AlarmCreationBolt extends BaseRichBolt { .getSourceStreamId())) { final String eventType = tuple.getString(0); if (EventProcessingBolt.UPDATED.equals(eventType)) { - // We could try to update the subalarms, but it is easier just to delete - // the waiting alarms and wait for them to be recreated. The AlarmDefinition - // itself is not cached so we don't have to do anything with it - removeWaitingAlarmsForAlarmDefinition(tuple.getString(2)); + final SubExpression subExpression = (SubExpression) tuple.getValue(1); + final String alarmDefinitionId = tuple.getString(2); + updateSubAlarms(subExpression, alarmDefinitionId); } } else if (EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) { final String eventType = tuple.getString(0); @@ -108,14 +110,22 @@ public class AlarmCreationBolt extends BaseRichBolt { if (EventProcessingBolt.DELETED.equals(eventType)) { final AlarmDefinitionDeletedEvent event = (AlarmDefinitionDeletedEvent) tuple.getValue(1); - removeWaitingAlarmsForAlarmDefinition(event.alarmDefinitionId); + deleteAlarmDefinition(event.alarmDefinitionId); } + else if (EventProcessingBolt.UPDATED.equals(eventType)) { + updateAlarmDefinition((AlarmDefinitionUpdatedEvent) tuple.getValue(1)); + } + } + } else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) { + final String eventType = tuple.getString(0); + if (EventProcessingBolt.DELETED.equals(eventType)) { + removeAlarm((AlarmDeletedEvent) tuple.getValue(2)); } } else { - logger.error("Receieved tuple on unknown stream {}", tuple); + logger.error("Received tuple on unknown stream {}", tuple); } - + } catch (Exception e) { logger.error("Error processing tuple {}", tuple, e); } finally { @@ -123,16 +133,76 @@ public class AlarmCreationBolt extends BaseRichBolt { } } - private void removeWaitingAlarmsForAlarmDefinition(String alarmDefinitionId) { + private void removeAlarm(AlarmDeletedEvent event) { + logger.debug("Deleting alarm {} for Alarm Definition {}", event.alarmId, event.alarmDefinitionId); + final List alarms = alarmCache.get(event.alarmDefinitionId); + if (alarms != null) { + for (final Alarm alarm : alarms) { + if (alarm.getId().equals(event.alarmId)) { + logger.debug("Deleted alarm {} for Alarm Definition {}", event.alarmId, event.alarmDefinitionId); + alarms.remove(alarm); + break; + } + } + } + } + + private void updateSubAlarms(final SubExpression subExpression, final String alarmDefinitionId) { + logger.debug("Updating SubAlarms for AlarmDefinition Id {} SubExpression {}", + alarmDefinitionId, subExpression); + int count = 0; + if (alarmDefinitionCache.containsKey(alarmDefinitionId)) { + final List waiting = waitingAlarms.get(alarmDefinitionId); + if (waiting != null && !waiting.isEmpty()) { + for (final Alarm alarm : waiting) { + if (!alarm.updateSubAlarm(subExpression)) { + logger.error("Did not find SubAlarms for AlarmDefinition Id {} SubExpression {} Alarm {}", + alarmDefinitionId, subExpression, alarm); + } + count++; + } + } + } + logger.debug("Updated {} SubAlarms for AlarmDefinition Id {}", count, alarmDefinitionId); + } + + private void updateAlarmDefinition(final AlarmDefinitionUpdatedEvent event) { + final AlarmDefinition alarmDefinition = alarmDefinitionCache.get(event.alarmDefinitionId); + if (alarmDefinition != null) { + logger.debug("Updating AlarmDefinition {}", event.alarmDefinitionId); + alarmDefinition.setName(event.alarmName); + alarmDefinition.setDescription(event.alarmDescription); + alarmDefinition.setActionsEnabled(event.alarmActionsEnabled); + alarmDefinition.setExpression(event.alarmExpression); + alarmDefinition.setSeverity(event.severity); + if (!alarmDefinition.getMatchBy().equals(event.matchBy)) { + logger.error("AlarmDefinition {}: match-by changed, was {} now {}", + event.alarmDefinitionId, alarmDefinition.getMatchBy(), event.matchBy); + } + alarmDefinition.setMatchBy(event.matchBy); // Should never change + for (Map.Entry entry : event.changedSubExpressions.entrySet()) { + if (!alarmDefinition.updateSubExpression(entry.getKey(), entry.getValue())) { + logger.error("AlarmDefinition {}: Did not finding matching SubAlarmExpression id={} SubAlarmExpression{}", + event.alarmDefinitionId, entry.getKey(), entry.getValue()); + } + } + } + } + + private void deleteAlarmDefinition(String alarmDefinitionId) { + logger.debug("Deleting AlarmDefinition {}", alarmDefinitionId); final List waiting = waitingAlarms.remove(alarmDefinitionId); if (waiting != null && !waiting.isEmpty()) { - logger.info("{} waiting alarms removed for Alarm Definition Id {}", waiting != null + logger.debug("{} waiting alarms removed for Alarm Definition Id {}", waiting != null && !waiting.isEmpty() ? waiting.size() : "No", alarmDefinitionId); } + alarmCache.remove(alarmDefinitionId); + alarmDefinitionCache.remove(alarmDefinitionId); } protected void handleNewMetricDefinition( final MetricDefinitionAndTenantId metricDefinitionAndTenantId, final String alarmDefinitionId) { + final long start = System.currentTimeMillis(); final AlarmDefinition alarmDefinition = lookUpAlarmDefinition(alarmDefinitionId); if (alarmDefinition == null) { return; @@ -142,7 +212,7 @@ public class AlarmCreationBolt extends BaseRichBolt { return; } - final List existingAlarms = alarmDAO.findForAlarmDefinitionId(alarmDefinitionId); + final List existingAlarms = getExistingAlarms(alarmDefinitionId); if (alreadyCreated(existingAlarms, metricDefinitionAndTenantId)) { logger.warn("MetricDefinition {} is already in existing Alarm", metricDefinitionAndTenantId); return; @@ -154,49 +224,66 @@ public class AlarmCreationBolt extends BaseRichBolt { return; } - final Alarm existingAlarm = + final List matchingAlarms = fitsInExistingAlarm(metricDefinitionAndTenantId, alarmDefinition, existingAlarms); - if (existingAlarm != null) { - logger.info("Metric {} fits into existing alarm {}", metricDefinitionAndTenantId, - existingAlarm); - addToExistingAlarm(existingAlarm, metricDefinitionAndTenantId); - sendNewMetricDefinition(existingAlarm, metricDefinitionAndTenantId); + if (!matchingAlarms.isEmpty()) { + for (final Alarm matchingAlarm : matchingAlarms) { + logger.info("Metric {} fits into existing alarm {}", metricDefinitionAndTenantId, + matchingAlarm.getId()); + addToExistingAlarm(matchingAlarm, metricDefinitionAndTenantId); + sendNewMetricDefinition(matchingAlarm, metricDefinitionAndTenantId); + } } else { final List newAlarms = finishesAlarm(alarmDefinition, metricDefinitionAndTenantId, existingAlarms); for (final Alarm newAlarm : newAlarms) { logger.info("Metric {} finishes waiting alarm {}", metricDefinitionAndTenantId, newAlarm); + existingAlarms.add(newAlarm); for (final MetricDefinitionAndTenantId md : newAlarm.getAlarmedMetrics()) { sendNewMetricDefinition(newAlarm, md); } } } + logger.debug("Total processing took {} milliseconds", System.currentTimeMillis() - start); } - private Alarm fitsInExistingAlarm(final MetricDefinitionAndTenantId metricDefinitionAndTenantId, + private List getExistingAlarms(final String alarmDefinitionId) { + List alarms = alarmCache.get(alarmDefinitionId); + if (alarms != null) { + return alarms; + } + final long start = System.currentTimeMillis(); + alarms = alarmDAO.findForAlarmDefinitionId(alarmDefinitionId); + logger.info("Loading {} Alarms took {} milliseconds", alarms.size(), System.currentTimeMillis() - start); + alarmCache.put(alarmDefinitionId, alarms); + return alarms; + } + + private List fitsInExistingAlarm(final MetricDefinitionAndTenantId metricDefinitionAndTenantId, final AlarmDefinition alarmDefinition, final List existingAlarms) { - Alarm existingAlarm = null; + final List result = new LinkedList<>(); if (alarmDefinition.getMatchBy().isEmpty()) { if (!existingAlarms.isEmpty()) { - existingAlarm = existingAlarms.get(0); + result.add(existingAlarms.get(0)); } } else { for (final Alarm alarm : existingAlarms) { if (metricFitsInAlarm(alarm, alarmDefinition, metricDefinitionAndTenantId)) { - existingAlarm = alarm; - break; + result.add(alarm); } } } - return existingAlarm; + return result; } private void addToExistingAlarm(Alarm existingAlarm, MetricDefinitionAndTenantId metricDefinitionAndTenantId) { existingAlarm.addAlarmedMetric(metricDefinitionAndTenantId); + final long start = System.currentTimeMillis(); alarmDAO.addAlarmedMetric(existingAlarm.getId(), metricDefinitionAndTenantId); + logger.debug("Add Alarm Metric took {} milliseconds", System.currentTimeMillis() - start); } private void sendNewMetricDefinition(Alarm existingAlarm, @@ -264,19 +351,14 @@ public class AlarmCreationBolt extends BaseRichBolt { if (waitingAlarms.isEmpty()) { final Alarm newAlarm = new Alarm(alarmDefinition, AlarmState.UNDETERMINED); newAlarm.addAlarmedMetric(metricDefinitionAndTenantId); + reuseExistingMetric(newAlarm, alarmDefinition, existingAlarms); if (alarmIsComplete(newAlarm)) { logger.debug("New alarm is complete. Saving"); saveAlarm(newAlarm); result.add(newAlarm); } else { - if (reuseExistingMetric(newAlarm, alarmDefinition, existingAlarms)) { - logger.debug("New alarm is complete reusing existing metric. Saving"); - saveAlarm(newAlarm); - result.add(newAlarm); } - else { - logger.debug("Adding new alarm to the waiting list"); - addToWaitingAlarms(newAlarm, alarmDefinition); - } + logger.debug("Adding new alarm to the waiting list"); + addToWaitingAlarms(newAlarm, alarmDefinition); } } else { for (final Alarm waiting : waitingAlarms) { @@ -291,25 +373,21 @@ public class AlarmCreationBolt extends BaseRichBolt { return result; } - private boolean reuseExistingMetric(Alarm newAlarm, final AlarmDefinition alarmDefinition, + private void reuseExistingMetric(Alarm newAlarm, final AlarmDefinition alarmDefinition, List existingAlarms) { - boolean addedOne = false; for (final Alarm existingAlarm : existingAlarms) { for (final MetricDefinitionAndTenantId mtid : existingAlarm.getAlarmedMetrics()) { if (metricFitsInAlarm(newAlarm, alarmDefinition, mtid)) { newAlarm.addAlarmedMetric(mtid); - addedOne = true; } } } - if (!addedOne) { - return false; - } - return alarmIsComplete(newAlarm); } private void saveAlarm(Alarm newAlarm) { + final long start = System.currentTimeMillis(); alarmDAO.createAlarm(newAlarm); + logger.debug("Add Alarm took {} milliseconds", System.currentTimeMillis() - start); } private List findMatchingWaitingAlarms(List waiting, AlarmDefinition alarmDefinition, @@ -418,12 +496,17 @@ public class AlarmCreationBolt extends BaseRichBolt { } private AlarmDefinition lookUpAlarmDefinition(String alarmDefinitionId) { - final AlarmDefinition found = alarmDefDAO.findById(alarmDefinitionId); + AlarmDefinition found = alarmDefinitionCache.get(alarmDefinitionId); + if (found != null) { + return found; + } + found = alarmDefDAO.findById(alarmDefinitionId); if (found == null) { logger.warn("Did not find AlarmDefinition for ID {}", alarmDefinitionId); return null; } + alarmDefinitionCache.put(found.getId(), found); return found; } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java index 9eb0dd4..5a7cd65 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java @@ -22,6 +22,7 @@ import monasca.common.model.event.AlarmDefinitionUpdatedEvent; import monasca.common.model.event.AlarmStateTransitionedEvent; import monasca.common.model.event.AlarmUpdatedEvent; import monasca.common.model.alarm.AlarmState; +import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.MetricDefinition; import monasca.common.streaming.storm.Logging; import monasca.common.streaming.storm.Streams; @@ -34,7 +35,6 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; -import monasca.thresh.ThresholdingConfiguration; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; @@ -153,7 +153,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt { final AlarmDefinition alarmDefinition = alarmDefinitions.get(event.alarmDefinitionId); if (alarmDefinition == null) { // This is OK. No Alarms are using this AlarmDefinition - logger.info("Update of AlarmDefinition {} skipped. Not in use by this bolt", + logger.debug("Update of AlarmDefinition {} skipped. Not in use by this bolt", event.alarmDefinitionId); return; } @@ -163,6 +163,12 @@ public class AlarmThresholdingBolt extends BaseRichBolt { alarmDefinition.setSeverity(event.severity); alarmDefinition.setActionsEnabled(event.alarmActionsEnabled); alarmDefinition.setExpression(event.alarmExpression); + for (Map.Entry entry : event.changedSubExpressions.entrySet()) { + if (!alarmDefinition.updateSubExpression(entry.getKey(), entry.getValue())) { + logger.error("AlarmDefinition {}: Did not finding matching SubAlarmExpression id={} SubAlarmExpression{}", + event.alarmDefinitionId, entry.getKey(), entry.getValue()); + } + } } @Override @@ -251,57 +257,6 @@ public class AlarmThresholdingBolt extends BaseRichBolt { } - void handleAlarmDefinitionUpdated(String alarmDefId, AlarmDefinitionUpdatedEvent event) { - final AlarmDefinition oldAlarmDef = alarmDefinitions.get(alarmDefId); - if (oldAlarmDef == null) { - logger.debug("Updated Alarm Definition {} not loaded, ignoring", alarmDefId); - return; - } - - oldAlarmDef.setName(event.alarmName); - oldAlarmDef.setDescription(event.alarmDescription); - oldAlarmDef.setExpression(event.alarmExpression); - oldAlarmDef.setActionsEnabled(event.alarmActionsEnabled); - - /* Have to figure out how to handle this - // Now handle the SubAlarms - // First remove the deleted SubAlarms so we don't have to consider them later - for (Map.Entry entry : event.oldAlarmSubExpressions - .entrySet()) { - logger.debug("Removing deleted SubAlarm {}", entry.getValue()); - if (!oldAlarmDef.removeSubAlarmById(entry.getKey())) { - logger.error("Did not find removed SubAlarm {}", entry.getValue()); - } - } - - // Reuse what we can from the changed SubAlarms - for (Map.Entry entry : event.changedSubExpressions - .entrySet()) { - final SubAlarm oldSubAlarm = oldAlarmDef.getSubAlarm(entry.getKey()); - if (oldSubAlarm == null) { - logger.error("Did not find changed SubAlarm {}", entry.getValue()); - continue; - } - final SubAlarm newSubAlarm = new SubAlarm(entry.getKey(), oldAlarmDef.getId(), entry.getValue()); - newSubAlarm.setState(oldSubAlarm.getState()); - if (!oldSubAlarm.isCompatible(newSubAlarm)) { - newSubAlarm.setNoState(true); - } - logger.debug("Changing SubAlarm from {} to {}", oldSubAlarm, newSubAlarm); - oldAlarmDef.updateSubAlarm(newSubAlarm); - } - - // Add the new SubAlarms - for (Map.Entry entry : event.newAlarmSubExpressions - .entrySet()) { - final SubAlarm newSubAlarm = new SubAlarm(entry.getKey(), oldAlarmDef.getId(), entry.getValue()); - newSubAlarm.setNoState(true); - logger.debug("Adding SubAlarm {}", newSubAlarm); - oldAlarmDef.updateSubAlarm(newSubAlarm); - } - */ - } - String buildStateChangeReason() { return null; } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java index cf895d9..d1cea0f 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java @@ -165,6 +165,7 @@ public class EventProcessingBolt extends BaseRichBolt { } void handle(AlarmDeletedEvent event) { + logger.debug("Alarm {} deleted", event.alarmId); processSubAlarms(DELETED, event.tenantId, event.alarmDefinitionId, event.alarmMetrics, event.subAlarms); diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java index 1123891..0e13697 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java @@ -296,6 +296,8 @@ public class MetricFilteringBolt extends BaseRichBolt { synchronized (SENTINAL) { alreadyFound.remove(metricDefinitionAndTenantId, alarmDefinitionId); } + logger.debug("Removed {} for Alarm Definition {}", metricDefinitionAndTenantId, + alarmDefinitionId); } } diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java index 2c9b707..d236949 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java @@ -19,8 +19,7 @@ package monasca.thresh.infrastructure.persistence; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; - -import com.google.common.io.Resources; +import static org.testng.Assert.assertNull; import monasca.common.model.alarm.AggregateFunction; import monasca.common.model.alarm.AlarmExpression; @@ -40,14 +39,19 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -@Test +/** + * These tests won't work without the real mysql database so use mini-mon. + * Warning, this will truncate the alarms part of your mini-mon database + * @author craigbr + * + */ +@Test(groups = "database") public class AlarmDAOImplTest { private static final String TENANT_ID = "bob"; private static String ALARM_NAME = "90% CPU"; @@ -63,10 +67,9 @@ public class AlarmDAOImplTest { @BeforeClass protected void setupClass() throws Exception { - db = new DBI("jdbc:h2:mem:test;MODE=MySQL"); + // See class comment + db = new DBI("jdbc:mysql://192.168.10.4/mon", "monapi", "password"); handle = db.open(); - handle - .execute(Resources.toString(getClass().getResource("alarm.sql"), Charset.defaultCharset())); dao = new AlarmDAOImpl(db); } @@ -77,6 +80,8 @@ public class AlarmDAOImplTest { @BeforeMethod protected void beforeMethod() { + handle.execute("SET foreign_key_checks = 0;"); + handle.execute("truncate table alarm_definition"); handle.execute("truncate table alarm"); handle.execute("truncate table sub_alarm"); handle.execute("truncate table sub_alarm_definition"); @@ -90,6 +95,7 @@ public class AlarmDAOImplTest { alarmDef = new AlarmDefinition(TENANT_ID, ALARM_NAME, ALARM_DESCR, new AlarmExpression( expr), "LOW", ALARM_ENABLED, new ArrayList()); + AlarmDefinitionDAOImplTest.insertAlarmDefinition(handle, alarmDef); final Map dimensions = new HashMap(); dimensions.put("first", "first_value"); @@ -107,13 +113,20 @@ public class AlarmDAOImplTest { dao.createAlarm(firstAlarm); final Alarm secondAlarm = new Alarm(alarmDef, AlarmState.OK); + secondAlarm.addAlarmedMetric(newMetric); dao.createAlarm(secondAlarm); final AlarmDefinition secondAlarmDef = new AlarmDefinition(TENANT_ID, "Second", null, new AlarmExpression( "avg(cpu{disk=vda, instance_id=123}) > 10"), "LOW", true, Arrays.asList("dev")); + AlarmDefinitionDAOImplTest.insertAlarmDefinition(handle, secondAlarmDef); final Alarm thirdAlarm = new Alarm(secondAlarmDef, AlarmState.OK); + final Map dims = new HashMap<>(); + dims.put("disk", "vda"); + dims.put("instance_id", "123"); + thirdAlarm.addAlarmedMetric(new MetricDefinitionAndTenantId(new MetricDefinition("cpu", dims), + secondAlarmDef.getTenantId())); dao.createAlarm(thirdAlarm); verifyAlarmList(dao.findForAlarmDefinitionId(alarmDef.getId()), firstAlarm, secondAlarm); @@ -124,7 +137,7 @@ public class AlarmDAOImplTest { } private void verifyAlarmList(final List found, Alarm... expected) { - assertEquals(expected.length, found.size()); + assertEquals(found.size(), expected.length); for (final Alarm alarm : expected) { assertTrue(found.contains(alarm)); } @@ -132,6 +145,8 @@ public class AlarmDAOImplTest { public void shouldFindById() { final Alarm newAlarm = new Alarm(alarmDef, AlarmState.OK); + assertNull(dao.findById(newAlarm.getId())); + dao.createAlarm(newAlarm); assertEquals(dao.findById(newAlarm.getId()), newAlarm); diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java index b767d68..e966df1 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java @@ -106,20 +106,20 @@ public class AlarmDefinitionDAOImplTest { final AlarmDefinition alarmDefinition = new AlarmDefinition(TENANT_ID, ALARM_NAME, ALARM_DESCR, expression, "LOW", false, Arrays.asList("fred", "barney")); - insert(alarmDefinition); + insertAlarmDefinition(handle, alarmDefinition); verifyListAllMatches(alarmDefinition); final AlarmExpression expression2 = new AlarmExpression("max(cpu{service=swift}) > 90"); final AlarmDefinition alarmDefinition2 = new AlarmDefinition(TENANT_ID, ALARM_NAME, ALARM_DESCR, expression2, "LOW", false, Arrays.asList("fred", "barney", "wilma", "betty")); - insert(alarmDefinition2); + insertAlarmDefinition(handle, alarmDefinition2); verifyListAllMatches(alarmDefinition, alarmDefinition2); } private void insertAndCheck(final AlarmDefinition alarmDefinition) { - insert(alarmDefinition); + insertAlarmDefinition(handle, alarmDefinition); assertEquals(dao.findById(alarmDefinition.getId()), alarmDefinition); } @@ -133,7 +133,9 @@ public class AlarmDefinitionDAOImplTest { } } - private void insert(AlarmDefinition alarmDefinition) { + // This method is not a test but without this TestNG tries to run it + @Test(enabled=false) + public static void insertAlarmDefinition(Handle handle, AlarmDefinition alarmDefinition) { try { handle.begin(); handle diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java index 331ba63..c1289a0 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java @@ -40,6 +40,7 @@ import monasca.common.model.alarm.AlarmExpression; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.event.AlarmDefinitionDeletedEvent; +import monasca.common.model.event.AlarmDeletedEvent; import monasca.common.model.metric.MetricDefinition; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; @@ -55,6 +56,7 @@ import org.mockito.stubbing.Answer; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -257,20 +259,48 @@ public class AlarmCreationBoltTest { // Update the Alarm Definition final SubExpression first = alarmDefinition.getSubExpressions().get(0); - first.getAlarmSubExpression().setThreshold(42.0); + + // We make a copy of the SubExpression because the actual SubExpression from the AlarmDefinition is + // in the Alarm and updating first updates the SubAlarm's SubExpresssion directly + final SubExpression copy = new SubExpression(first.getId(), AlarmSubExpression.of(first.getAlarmSubExpression().getExpression())); + copy.getAlarmSubExpression().setThreshold(42.0); final MkTupleParam tupleParam = new MkTupleParam(); tupleParam.setFields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS); tupleParam.setStream(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID); final Tuple tuple = Testing.testTuple( - Arrays.asList(EventProcessingBolt.UPDATED, first, alarmDefinition.getId()), tupleParam); + Arrays.asList(EventProcessingBolt.UPDATED, copy, alarmDefinition.getId()), tupleParam); bolt.execute(tuple); - // Ensure they are gone - assertNull(bolt.countWaitingAlarms(alarmDefinition.getId())); + final AlarmSubExpression subExpr2 = + alarmDefinition.getAlarmExpression().getSubExpressions().get(1); + + // Now finish the Alarms + for (final String hostname : hostnames) { + final MetricDefinition metric = + build(subExpr2.getMetricDefinition().name, "hostname", hostname, "service", "2"); + sendNewMetric(new MetricDefinitionAndTenantId(metric, TENANT_ID), alarmDefinition.getId()); + } + + assertEquals(this.createdAlarms.size(), hostnames.size()); + + // Can't use verifyCreatedAlarm because then the AlarmDefinition must be updated which + // might update the SubAlarms directly because of reuse of AlarmSubExpressions + for (final Alarm alarm : this.createdAlarms) { + boolean found = false; + for (SubAlarm subAlarm : alarm.getSubAlarms()) { + if (subAlarm.getAlarmSubExpressionId().equals(first.getId())) { + assertEquals(subAlarm.getExpression().getThreshold(), + copy.getAlarmSubExpression().getThreshold()); + found = true; + break; + } + } + assertTrue(found, "Did not find expected sub alarm"); + } } - + private void sendAlarmDefinitionDeleted(final AlarmDefinition alarmDefinition) { final Map subAlarmMetricDefinitions = new HashMap<>(); for (final AlarmSubExpression subExpr : alarmDefinition.getAlarmExpression().getSubExpressions()) { @@ -347,6 +377,118 @@ public class AlarmCreationBoltTest { testMultipleExpressions(metricDefinitionsToSend, numDevs); } + public void testReuseMetricFromExistingAlarm() { + final String expression = "max(cpu{service=vivi}) > 90"; + final String[] matchBy = new String[] { "hostname", "amplifier" }; + final AlarmDefinition alarmDefinition = createAlarmDefinition(expression, matchBy); + + final MetricDefinition metric = + build("cpu", "hostname", "eleanore", "amplifier", "2", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 1); + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID)); + + final MetricDefinition metric2 = + build("cpu", "hostname", "eleanore", "service", "vivi"); + + sendNewMetric(new MetricDefinitionAndTenantId(metric2, TENANT_ID), alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 1, + "A second alarm was created instead of the metric fitting into the first"); + + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + + final MetricDefinition metric3 = + build("cpu", "hostname", "eleanore", "amplifier", "3", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric3, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 2); + + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric3, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + } + + public void testUseMetricInExistingAlarm() { + final String expression = "max(cpu{service=vivi}) > 90"; + final String[] matchBy = new String[] { "hostname", "amplifier" }; + final AlarmDefinition alarmDefinition = createAlarmDefinition(expression, matchBy); + + final MetricDefinition metric = + build("cpu", "hostname", "eleanore", "amplifier", "2", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 1); + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID)); + + final MetricDefinition metric3 = + build("cpu", "hostname", "eleanore", "amplifier", "3", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric3, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 2); + + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric3, TENANT_ID)); + + final MetricDefinition metric2 = + build("cpu", "hostname", "eleanore", "service", "vivi"); + + sendNewMetric(new MetricDefinitionAndTenantId(metric2, TENANT_ID), alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 2, + "A third alarm was created instead of the metric fitting into the first two"); + + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric3, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + } + + public void testDeletedAlarm() { + final AlarmDefinition alarmDefinition = runCreateSimpleAlarm(); + assertEquals(this.createdAlarms.size(), 1); + final Alarm alarmToDelete = this.createdAlarms.get(0); + this.createdAlarms.clear(); + final Map subAlarms = new HashMap<>(); + for (final SubAlarm subAlarm : alarmToDelete.getSubAlarms()) { + subAlarms.put(subAlarm.getId(), subAlarm.getExpression()); + } + final List alarmedMetrics = new ArrayList<>(); + for (final MetricDefinitionAndTenantId mdtid : alarmToDelete.getAlarmedMetrics()) { + alarmedMetrics.add(mdtid.metricDefinition); + } + final AlarmDeletedEvent event = new AlarmDeletedEvent(TENANT_ID, alarmToDelete.getId(), + alarmedMetrics, alarmToDelete.getAlarmDefinitionId(), subAlarms); + + final MkTupleParam tupleParam = new MkTupleParam(); + tupleParam.setFields(EventProcessingBolt.ALARM_EVENT_STREAM_FIELDS); + tupleParam.setStream(EventProcessingBolt.ALARM_EVENT_STREAM_ID); + final Tuple tuple = + Testing.testTuple(Arrays.asList(EventProcessingBolt.DELETED, alarmToDelete.getId(), event), + tupleParam); + + bolt.execute(tuple); + + // Make sure the alarm gets created again + createAlarms(alarmDefinition); + } + private void testMultipleExpressions(final List metricDefinitionsToSend, final int numAlarms) { final AlarmDefinition alarmDefinition = @@ -359,10 +501,15 @@ public class AlarmCreationBoltTest { assertEquals(this.createdAlarms.size(), numAlarms); } - private void runCreateSimpleAlarm(final String... matchBy) { + private AlarmDefinition runCreateSimpleAlarm(final String... matchBy) { final String expression = "max(cpu{service=2}) > 90"; final AlarmDefinition alarmDefinition = createAlarmDefinition(expression, matchBy); + createAlarms(alarmDefinition, matchBy); + return alarmDefinition; + } + + private void createAlarms(final AlarmDefinition alarmDefinition, final String... matchBy) { final MetricDefinition metric = build("cpu", "hostname", "eleanore", "service", "2", "other", "vivi"); @@ -373,19 +520,18 @@ public class AlarmCreationBoltTest { verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, new MetricDefinitionAndTenantId(metric, TENANT_ID)); - this.createdAlarms.clear(); final MetricDefinition metric2 = build("cpu", "hostname", "vivi", "service", "2", "other", "eleanore"); sendNewMetric(new MetricDefinitionAndTenantId(metric2, TENANT_ID), alarmDefinition.getId()); if (matchBy.length == 0) { - assertEquals(this.createdAlarms.size(), 0, + assertEquals(this.createdAlarms.size(), 1, "A second alarm was created instead of the metric fitting into the first"); } else { - assertEquals(this.createdAlarms.size(), 1, + assertEquals(this.createdAlarms.size(), 2, "The metric was fitted into the first alarm instead of creating a new alarm"); - verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, new MetricDefinitionAndTenantId(metric2, TENANT_ID)); // Now send a metric that must fit into the just created alarm to test that @@ -395,10 +541,10 @@ public class AlarmCreationBoltTest { sendNewMetric(new MetricDefinitionAndTenantId(metric3, TENANT_ID), alarmDefinition.getId()); - assertEquals(this.createdAlarms.size(), 1, + assertEquals(this.createdAlarms.size(), 2, "The metric created a new alarm instead of fitting into the second"); - verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, new MetricDefinitionAndTenantId(metric2, TENANT_ID), new MetricDefinitionAndTenantId(metric3, TENANT_ID)); } } @@ -467,6 +613,8 @@ public class AlarmCreationBoltTest { } expectedAlarm.setSubAlarms(expectedSubAlarms); + assertEquals(newAlarm.getAlarmedMetrics().size(), mtids.length); + for (final SubAlarm subAlarm : expectedAlarm.getSubAlarms()) { // Have to do it this way because order of sub alarms is not deterministic MetricDefinitionAndTenantId mtid = null;