Merge "Increase Persister Performance"
This commit is contained in:
commit
97a9a7032a
monasca_persister
@ -1,4 +1,4 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -34,4 +34,4 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
|
||||
self.conf.influxdb.database_name)
|
||||
|
||||
def write_batch(self, data_points):
|
||||
self._influxdb_client.write_points(data_points, 'ms')
|
||||
self._influxdb_client.write_points(data_points, 'ms', protocol='line')
|
||||
|
@ -1,4 +1,4 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -12,13 +12,12 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from oslo_log import log
|
||||
import pytz
|
||||
|
||||
from monasca_persister.repositories.influxdb import abstract_repository
|
||||
from monasca_persister.repositories.influxdb import line_utils
|
||||
from monasca_persister.repositories.utils import parse_alarm_state_hist_message
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -39,28 +38,27 @@ class AlarmStateHistInfluxdbRepository(
|
||||
time_stamp) = parse_alarm_state_hist_message(
|
||||
message)
|
||||
|
||||
ts = time_stamp / 1000.0
|
||||
name = u'alarm_state_history'
|
||||
fields = []
|
||||
fields.append(u'tenant_id=' + line_utils.escape_value(tenant_id))
|
||||
fields.append(u'alarm_id=' + line_utils.escape_value(alarm_id))
|
||||
fields.append(u'metrics=' + line_utils.escape_value(
|
||||
json.dumps(metrics, ensure_ascii=False)))
|
||||
fields.append(u'new_state=' + line_utils.escape_value(new_state))
|
||||
fields.append(u'old_state=' + line_utils.escape_value(old_state))
|
||||
fields.append(u'link=' + line_utils.escape_value(link))
|
||||
fields.append(u'lifecycle_state=' + line_utils.escape_value(
|
||||
lifecycle_state))
|
||||
fields.append(u'reason=' + line_utils.escape_value(
|
||||
state_change_reason))
|
||||
fields.append(u'reason_data=' + line_utils.escape_value("{}"))
|
||||
fields.append(u'sub_alarms=' + line_utils.escape_value(
|
||||
sub_alarms_json_snake_case))
|
||||
|
||||
data = {"measurement": 'alarm_state_history',
|
||||
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
|
||||
'%Y-%m-%dT%H:%M:%S.%fZ'),
|
||||
"fields": {
|
||||
"tenant_id": tenant_id.encode('utf8'),
|
||||
"alarm_id": alarm_id.encode('utf8'),
|
||||
"metrics": json.dumps(metrics, ensure_ascii=False).encode(
|
||||
'utf8'),
|
||||
"new_state": new_state.encode('utf8'),
|
||||
"old_state": old_state.encode('utf8'),
|
||||
"link": link.encode('utf8'),
|
||||
"lifecycle_state": lifecycle_state.encode('utf8'),
|
||||
"reason": state_change_reason.encode('utf8'),
|
||||
"reason_data": "{}".encode('utf8'),
|
||||
"sub_alarms": sub_alarms_json_snake_case.encode('utf8')
|
||||
},
|
||||
"tags": {
|
||||
"tenant_id": tenant_id.encode('utf8')
|
||||
}}
|
||||
line = name + u',tenant_id=' + line_utils.escape_tag(tenant_id)
|
||||
line += u' ' + u','.join(fields)
|
||||
line += u' ' + str(int(time_stamp))
|
||||
|
||||
LOG.debug(data)
|
||||
LOG.debug(line)
|
||||
|
||||
return data
|
||||
return line
|
||||
|
46
monasca_persister/repositories/influxdb/line_utils.py
Normal file
46
monasca_persister/repositories/influxdb/line_utils.py
Normal file
@ -0,0 +1,46 @@
|
||||
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six import PY2
|
||||
|
||||
|
||||
def escape_tag(tag):
|
||||
tag = get_unicode(tag)
|
||||
return tag.replace(
|
||||
u"\\", u"\\\\"
|
||||
).replace(
|
||||
u" ", u"\\ "
|
||||
).replace(
|
||||
u",", u"\\,"
|
||||
).replace(
|
||||
u"=", u"\\="
|
||||
)
|
||||
|
||||
def get_unicode(data):
|
||||
if PY2:
|
||||
return unicode(data)
|
||||
else:
|
||||
return str(data)
|
||||
|
||||
def escape_value(value):
|
||||
return u"\"{0}\"".format(
|
||||
get_unicode(value).replace(
|
||||
u"\\", u"\\\\"
|
||||
).replace(
|
||||
u"\"", u"\\\""
|
||||
).replace(
|
||||
u"\n", u"\\n"
|
||||
)
|
||||
)
|
@ -1,4 +1,4 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -12,13 +12,12 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from oslo_log import log
|
||||
import pytz
|
||||
|
||||
from monasca_persister.repositories.influxdb import abstract_repository
|
||||
from monasca_persister.repositories.influxdb import line_utils
|
||||
from monasca_persister.repositories.utils import parse_measurement_message
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -36,20 +35,27 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
|
||||
value_meta) = parse_measurement_message(message)
|
||||
|
||||
tags = dimensions
|
||||
tags['_tenant_id'] = tenant_id.encode('utf8')
|
||||
tags['_region'] = region.encode('utf8')
|
||||
tags[u'_tenant_id'] = tenant_id
|
||||
tags[u'_region'] = region
|
||||
|
||||
ts = time_stamp / 1000.0
|
||||
if not value_meta:
|
||||
value_meta_str = u'"{}"'
|
||||
else:
|
||||
value_meta_str = line_utils.escape_value(json.dumps(value_meta, ensure_ascii=False))
|
||||
|
||||
data = {"measurement": metric_name.encode('utf8'),
|
||||
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
|
||||
'%Y-%m-%dT%H:%M:%S.%fZ'),
|
||||
"fields": {
|
||||
"value": value,
|
||||
"value_meta": json.dumps(value_meta,
|
||||
ensure_ascii=False).encode('utf8')
|
||||
},
|
||||
"tags": tags}
|
||||
key_values = [line_utils.escape_tag(metric_name)]
|
||||
|
||||
# tags should be sorted client-side to take load off server
|
||||
for key in sorted(tags.keys()):
|
||||
key_tag = line_utils.escape_tag(key)
|
||||
value_tag = line_utils.escape_tag(tags[key])
|
||||
key_values.append(key_tag + u'=' + value_tag)
|
||||
key_values = u','.join(key_values)
|
||||
|
||||
value_field = u'value={}'.format(value)
|
||||
value_meta_field = u'value_meta=' + value_meta_str
|
||||
|
||||
data = key_values + u' ' + value_field + u',' + value_meta_field + u' ' + str(int(time_stamp))
|
||||
|
||||
LOG.debug(data)
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -27,24 +27,17 @@ def parse_measurement_message(message):
|
||||
|
||||
tenant_id = decoded_message['meta']['tenantId']
|
||||
|
||||
dimensions = {}
|
||||
if 'dimensions' in metric:
|
||||
for dimension_name in metric['dimensions']:
|
||||
dimensions[dimension_name.encode('utf8')] = (
|
||||
metric['dimensions'][dimension_name].encode('utf8'))
|
||||
|
||||
time_stamp = metric['timestamp']
|
||||
|
||||
value = float(metric['value'])
|
||||
|
||||
if 'value_meta' in metric and metric['value_meta']:
|
||||
value_meta = metric['value_meta']
|
||||
|
||||
else:
|
||||
value_meta = metric.get('value_meta', {})
|
||||
if 'value_meta' is None:
|
||||
# Ensure value_meta is a dict
|
||||
value_meta = {}
|
||||
|
||||
return (dimensions, metric_name, region, tenant_id, time_stamp, value,
|
||||
value_meta)
|
||||
return (metric.get('dimensions', {}), metric_name, region, tenant_id,
|
||||
time_stamp, value, value_meta)
|
||||
|
||||
|
||||
def parse_alarm_state_hist_message(message):
|
||||
|
42
monasca_persister/tests/test_influxdb.py
Normal file
42
monasca_persister/tests/test_influxdb.py
Normal file
@ -0,0 +1,42 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslotest import base
|
||||
|
||||
from monasca_persister.repositories.influxdb import line_utils
|
||||
|
||||
class TestInfluxdb(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestInfluxdb, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(TestInfluxdb, self).tearDown()
|
||||
|
||||
def test_line_utils_handles_utf8(self):
|
||||
utf8_name = u'name'
|
||||
self.assertEqual(u'"' + utf8_name + u'"', line_utils.escape_value(utf8_name))
|
||||
self.assertEqual(utf8_name, line_utils.escape_tag(utf8_name))
|
||||
|
||||
def test_line_utils_escape_tag(self):
|
||||
simple = u"aaaaa"
|
||||
self.assertEqual(simple, line_utils.escape_tag(simple))
|
||||
complex = u"a\\ b,c="
|
||||
self.assertEqual("a\\\\\\ b\\,c\\=", line_utils.escape_tag(complex))
|
||||
|
||||
def test_line_utils_escape_value(self):
|
||||
simple = u"aaaaa"
|
||||
self.assertEqual(u'"' + simple + u'"', line_utils.escape_value(simple))
|
||||
complex = u"a\\b\"\n"
|
||||
self.assertEqual(u"\"a\\\\b\\\"\\n\"", line_utils.escape_value(complex))
|
Loading…
x
Reference in New Issue
Block a user