diff --git a/monasca_persister/persister.conf b/monasca_persister/persister.conf index af837342..1967421e 100644 --- a/monasca_persister/persister.conf +++ b/monasca_persister/persister.conf @@ -59,9 +59,3 @@ ip_address = 192.168.10.4 port = 8086 user = mon_persister password = password - -# Uncomment, set cluster_ip_addresses, and change the repositories to point to the cassandra classes -#[cassandra] -# Comma separated list of Cassandra node IP addresses. No spaces. -#cluster_ip_addresses: 10.10.10.3 -#keyspace: monasca diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 05a68b56..207827a5 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -16,7 +16,7 @@ """Persister Module The Persister reads metrics and alarms from Kafka and then stores them - in into either Influxdb or Cassandra + in into Influxdb Start the perister as stand-alone process by running 'persister.py --config-file ' diff --git a/monasca_persister/repositories/cassandra/__init__.py b/monasca_persister/repositories/cassandra/__init__.py deleted file mode 100644 index a9d04897..00000000 --- a/monasca_persister/repositories/cassandra/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from oslo_config import cfg - -cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'), - cfg.StrOpt('keyspace')] - -cassandra_group = cfg.OptGroup(name='cassandra') -cfg.CONF.register_group(cassandra_group) -cfg.CONF.register_opts(cassandra_opts, cassandra_group) diff --git a/monasca_persister/repositories/cassandra/abstract_repository.py b/monasca_persister/repositories/cassandra/abstract_repository.py deleted file mode 100644 index a0d5e9d7..00000000 --- a/monasca_persister/repositories/cassandra/abstract_repository.py +++ /dev/null @@ -1,37 +0,0 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import abc -from cassandra.cluster import Cluster -from cassandra.query import BatchStatement -from oslo_config import cfg -import six - -from repositories.abstract_repository import AbstractRepository - - -@six.add_metaclass(abc.ABCMeta) -class AbstractCassandraRepository(AbstractRepository): - - def __init__(self): - super(AbstractCassandraRepository, self).__init__() - self.conf = cfg.CONF - - self._cassandra_cluster = Cluster( - self.conf.cassandra.cluster_ip_addresses.split(',')) - - self.cassandra_session = self._cassandra_cluster.connect( - self.conf.cassandra.keyspace) - - self._batch_stmt = BatchStatement() diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py deleted file mode 100644 index b74d9b6d..00000000 --- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py +++ /dev/null @@ -1,70 +0,0 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json - -from cassandra.query import BatchStatement -from oslo_log import log - -from repositories.cassandra.abstract_repository import AbstractCassandraRepository -from repositories.utils import parse_alarm_state_hist_message - -LOG = log.getLogger(__name__) - - -class AlarmStateHistCassandraRepository(AbstractCassandraRepository): - - def __init__(self): - - super(AlarmStateHistCassandraRepository, self).__init__() - - self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare( - 'insert into alarm_state_history (tenant_id, alarm_id, ' - 'metrics, new_state, ' - 'old_state, reason, reason_data, ' - 'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)') - - def process_message(self, message): - - (alarm_id, metrics, new_state, old_state, state_change_reason, - sub_alarms_json_snake_case, tenant_id, - time_stamp) = parse_alarm_state_hist_message( - message) - - alarm_state_hist = ( - tenant_id.encode('utf8'), - alarm_id.encode('utf8'), - json.dumps(metrics, ensure_ascii=False).encode( - 'utf8'), - new_state.encode('utf8'), - old_state.encode('utf8'), - state_change_reason.encode('utf8'), - "{}".encode('utf8'), - sub_alarms_json_snake_case.encode('utf8'), - time_stamp - ) - - LOG.debug(alarm_state_hist) - - return alarm_state_hist - - def write_batch(self, alarm_state_hists): - - for alarm_state_hist in alarm_state_hists: - self._batch_stmt.add(self._insert_alarm_state_hist_stmt, - alarm_state_hist) - - self.cassandra_session.execute(self._batch_stmt) - - self._batch_stmt = BatchStatement() \ No newline at end of file diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py deleted file mode 100644 index 77963d1b..00000000 --- a/monasca_persister/repositories/cassandra/metrics_repository.py +++ /dev/null @@ -1,120 +0,0 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import hashlib -import json - -from cassandra.query import BatchStatement -from oslo_log import log -import urllib - -from repositories.cassandra.abstract_repository import AbstractCassandraRepository -from repositories.utils import parse_measurement_message - -LOG = log.getLogger(__name__) - - -class MetricCassandraRepository(AbstractCassandraRepository): - - def __init__(self): - - super(MetricCassandraRepository, self).__init__() - - self._insert_measurement_stmt = self.cassandra_session.prepare( - 'insert into measurements (tenant_id,' - 'region, metric_hash, time_stamp, value,' - 'value_meta) values (?, ?, ?, ?, ?, ?)') - - self._insert_metric_map_stmt = self.cassandra_session.prepare( - 'insert into metric_map (tenant_id,' - 'region, metric_hash, ' - 'metric_set) values' - '(?,?,?,?)') - - def process_message(self, message): - - (dimensions, metric_name, region, tenant_id, time_stamp, value, - value_meta) = parse_measurement_message(message) - - metric_hash, metric_set = create_metric_hash(metric_name, - dimensions) - - measurement = (tenant_id.encode('utf8'), - region.encode('utf8'), - metric_hash, - time_stamp, - value, - json.dumps(value_meta, ensure_ascii=False).encode( - 'utf8')) - - LOG.debug(measurement) - - return MetricMeasurementInfo( - tenant_id.encode('utf8'), - region.encode('utf8'), - metric_hash, - metric_set, - measurement) - - def write_batch(self, metric_measurement_infos): - - for metric_measurement_info in metric_measurement_infos: - - self._batch_stmt.add(self._insert_measurement_stmt, - metric_measurement_info.measurement) - - metric_map = (metric_measurement_info.tenant_id, - metric_measurement_info.region, - metric_measurement_info.metric_hash, - metric_measurement_info.metric_set) - - self._batch_stmt.add(self._insert_metric_map_stmt, - metric_map) - - self.cassandra_session.execute(self._batch_stmt) - - self._batch_stmt = BatchStatement() - - -class MetricMeasurementInfo(object): - - def __init__(self, tenant_id, region, metric_hash, metric_set, - measurement): - - self.tenant_id = tenant_id - self.region = region - self.metric_hash = metric_hash - self.metric_set = metric_set - self.measurement = measurement - - -def create_metric_hash(metric_name, dimensions): - - metric_name_part = '__name__' + '=' + urllib.quote_plus(metric_name) - - hash_string = metric_name_part - - metric_set = set() - - metric_set.add(metric_name_part) - - for dim_name in sorted(dimensions.iterkeys()): - dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus( - dimensions[dim_name])) - metric_set.add(dimension) - hash_string += dimension - - sha1_hash = hashlib.sha1(hash_string).hexdigest() - - return bytearray.fromhex(sha1_hash), metric_set diff --git a/requirements.txt b/requirements.txt index 4ff8a8e1..99c0d209 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,6 @@ oslo.service six>=1.9.0 babel influxdb==2.8.0 -cassandra-driver==3.0.0 iso8601 simport monasca-common