Influxdb query crafter

Change-Id: Idc0a68016bd4d9b156bf86da2edf3496b3419f20
This commit is contained in:
Frédéric Vachon 2015-04-24 15:46:33 -04:00
parent 6ffa70b9cc
commit 599c58e36c
7 changed files with 224 additions and 136 deletions

View File

@ -0,0 +1,82 @@
# Copyright 2014 - Savoir-Faire Linux inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import wsme
def filter_fields(item_list, live_query):
filtered_items = []
if live_query.fields != wsme.Unset:
fields = json.loads(live_query.fields)
for item in item_list:
filtered_item = {}
for field in fields:
filtered_item[field] = item[field]
filtered_items.append(filtered_item)
else:
filtered_items = item_list
return filtered_items
def build_influxdb_query(live_query, measurement, group_by=[], limit=None):
query = ['SELECT * FROM', measurement]
if group_by:
query.append('GROUP BY')
query.append(', '.join(group_by))
if limit is not None:
query.append('LIMIT %d' % limit)
if live_query:
filters = json.loads(live_query.filters)
if filters:
query.append(_build_where_clause(filters))
return ' '.join(query)
def _build_where_clause(filters):
filters_conversion = {
'is': '=',
'isnot': '!='
}
clause = []
first = True
for filter_name, filter_data in filters.items():
for field, values in filter_data.items():
for value in values:
if first:
clause.append('WHERE')
else:
clause.append('AND')
if type(value) == int:
clause.append("%s%s%d" % (field,
filters_conversion[filter_name],
value))
else:
clause.append("%s%s'%s'" %
(field,
filters_conversion[filter_name],
value))
first = False
return ' '.join(clause)

View File

@ -1,56 +0,0 @@
# Copyright 2014 - Savoir-Faire Linux inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import wsme
def filter_dict_list_with_live_query(item_list, live_query):
filters = json.loads(live_query.filters)
matching_items = []
for item in item_list:
matches = True
# Filters are, for example, 'isnot' or 'is'
for filter in filters.items():
# Fields are, for example, 'STATE'
for field in filter[1].items():
# Values are, for example, 0, 1, UP, Down...
for value in field[1]:
if filter[0] == "isnot":
if item[field[0]] == value:
matches = False
break
elif filter[0] == "is":
if item[field[0]] != value:
matches = False
break
if matches:
matching_item = {}
if live_query.fields != wsme.Unset:
fields = json.loads(live_query.fields)
for field in fields:
matching_item[field] = item[field]
else:
matching_item = item
matching_items.append(matching_item)
return matching_items

View File

@ -17,7 +17,7 @@ import json
from surveil.api.datamodel.status import live_host
from surveil.api.handlers import handler
from surveil.api.handlers.status import liveQuery_filter as query_filter
from surveil.api.handlers.status import influxdb_query
class HostHandler(handler.Handler):
@ -39,9 +39,12 @@ class HostHandler(handler.Handler):
def get_all(self, live_query=None):
"""Return all live hosts."""
cli = self.request.influxdb_client
query = ("SELECT * from HOST_STATE "
"GROUP BY host_name, address, childs "
"LIMIT 1")
query = influxdb_query.build_influxdb_query(
live_query,
'HOST_STATE',
group_by=['host_name', 'address', 'childs'],
limit=1
)
response = cli.query(query)
host_dicts = []
@ -51,7 +54,7 @@ class HostHandler(handler.Handler):
host_dicts.append(host_dict)
if live_query:
host_dicts = query_filter.filter_dict_list_with_live_query(
host_dicts = influxdb_query.filter_fields(
host_dicts,
live_query
)

View File

@ -16,7 +16,7 @@ from __future__ import print_function
from surveil.api.datamodel.status import live_service
from surveil.api.handlers import handler
from surveil.api.handlers.status import liveQuery_filter as query_filter
from surveil.api.handlers.status import influxdb_query
class ServiceHandler(handler.Handler):
@ -39,10 +39,11 @@ class ServiceHandler(handler.Handler):
def get_all(self, live_query=None):
"""Return all live services."""
cli = self.request.influxdb_client
query = (
"SELECT * from SERVICE_STATE "
"GROUP BY host_name, service_description "
"LIMIT 1"
query = influxdb_query.build_influxdb_query(
live_query,
'SERVICE_STATE',
group_by=['host_name', 'service_description'],
limit=1
)
response = cli.query(query)
@ -54,7 +55,7 @@ class ServiceHandler(handler.Handler):
service_dicts.append(service_dict)
if live_query:
service_dicts = query_filter.filter_dict_list_with_live_query(
service_dicts = influxdb_query.filter_fields(
service_dicts,
live_query
)

View File

@ -138,15 +138,45 @@ class TestStatusHosts(functionalTest.FunctionalTest):
self.assertItemsEqual(json.loads(response.body), expected)
self.assertEqual(
httpretty.last_request().querystring['q'],
["SELECT * from HOST_STATE "
["SELECT * FROM HOST_STATE "
"GROUP BY host_name, address, childs LIMIT 1"]
)
@httpretty.activate
def test_query_hosts(self):
influxdb_response = json.dumps({
"results": [
{
"series": [
{"name": "HOST_STATE",
"tags": {"host_name": "ws-arbiter",
"address": "127.0.0.1",
"childs": '["test_keystone"]'},
"columns": [
"time",
"last_check",
"last_state_change",
"output",
"state",
"state_type",
"acknowledged"
],
"values":[
["2015-04-19T01:09:24Z",
1.429405764e+09,
1.429405765317063e+09,
"OK - localhost: rta 0.030ms, lost 0%",
0,
"HARD",
0]
]}
]
}
]
})
httpretty.register_uri(httpretty.GET,
"http://influxdb:8086/query",
body=self.influxdb_response)
body=influxdb_response)
query = {
'fields': json.dumps(['host_name', 'last_check']),
@ -164,6 +194,13 @@ class TestStatusHosts(functionalTest.FunctionalTest):
self.assertItemsEqual(json.loads(response.body), expected)
self.assertEqual(
httpretty.last_request().querystring['q'],
["SELECT * FROM HOST_STATE GROUP BY host_name, address, childs"
" LIMIT 1 WHERE host_name!='localhost' "
"AND description!='test_keystone'"]
)
@httpretty.activate
def test_get_specific_host(self):
influx_response = json.dumps(
@ -207,6 +244,12 @@ class TestStatusHosts(functionalTest.FunctionalTest):
self.assertItemsEqual(json.loads(response.body), expected)
self.assertEqual(
httpretty.last_request().querystring['q'],
["SELECT * from HOST_STATE WHERE host_name='localhost'"
" GROUP BY * LIMIT 1"]
)
@httpretty.activate
def test_get_specific_host_service(self):
influx_response = json.dumps(

View File

@ -105,11 +105,47 @@ class TestStatusServices(functionalTest.FunctionalTest):
self.assertEqual(json.loads(response.body), expected)
self.assertEqual(
httpretty.last_request().querystring['q'],
["SELECT * FROM SERVICE_STATE GROUP BY host_name,"
" service_description LIMIT 1"]
)
@httpretty.activate
def test_query_services(self):
influxdb_response = json.dumps({
"results": [
{"series": [
{"name": "SERVICE_STATE",
"tags": {"host_name": "test_keystone",
"service_description":
"Check KeyStone service."},
"columns": [
"time",
"last_check",
"last_state_change",
"output",
"state",
"state_type",
"acknowledged"
],
"values":[
["2015-04-19T18:20:34Z",
1.429467634e+09,
1.429467636632134e+09,
("There was no suitable "
"authentication url for this request"),
3,
"SOFT",
0]
]}
]}
]
})
httpretty.register_uri(httpretty.GET,
"http://influxdb:8086/query",
body=self.influxdb_response)
body=influxdb_response)
query = {
'fields': json.dumps(['host_name', 'service_description']),

View File

@ -15,34 +15,20 @@
import json
from surveil.api.datamodel.status import live_query
from surveil.api.handlers.status import liveQuery_filter as query_filter
from surveil.api.handlers.status import influxdb_query
from surveil.tests import base
class LiveQueryFilterTest(base.BaseTestCase):
def setUp(self):
self.items = [
{"description": "localhost",
"last_state_change": 1429400991,
"plugin_output": "OK - localhost: rta 0.047ms, lost 0%",
"last_check": 1429400990,
"state": 0,
"host_name": "localhost"},
def test_filter_fields(self):
items = [
{"description": "test_keystone",
"last_state_change": 1429400986,
"plugin_output": "OK - 127.0.0.1: rta 0.045ms, lost 0%",
"last_check": 1429400984, "state": 2,
"host_name": "test_keystone"},
{"description": "ws-arbiter",
"last_state_change": 1429400991,
"plugin_output": "OK - localhost: rta 0.042ms, lost 0%",
"last_check": 1429400990,
"state": 2,
"host_name": "ws-arbiter"}
]
def test_query_builder_filter_isnot(self):
]
query = live_query.LiveQuery(
fields=json.dumps(['host_name', 'last_check']),
filters=json.dumps({
@ -53,63 +39,56 @@ class LiveQueryFilterTest(base.BaseTestCase):
})
)
result = query_filter.filter_dict_list_with_live_query(
self.items,
result = influxdb_query.filter_fields(
items,
query
)
expected = [{"last_check": 1429400990, "host_name": "ws-arbiter"}]
expected = [{"last_check": 1429400984, "host_name": "test_keystone"}]
self.assertItemsEqual(result, expected)
def test_query_builder_filter_is(self):
def test_build_where_clause(self):
filters = {
"is": {
"state": [0],
"description": ["test_keystone"]
}
}
result = influxdb_query._build_where_clause(
filters
)
expected = "WHERE state=0 AND description='test_keystone'"
self.assertItemsEqual(result, expected)
def test_build_where_clause_no_filters(self):
filters = {}
result = influxdb_query._build_where_clause(
filters
)
expected = ""
self.assertItemsEqual(result, expected)
def test_build_influx_query(self):
query = live_query.LiveQuery(
fields=json.dumps(['host_name']),
filters=json.dumps({
"is": {
"state": [0],
"description": ["localhost"]
},
"isnot": {
"state": [1]
}
})
fields=json.dumps(['host_name', 'last_check']),
filters=json.dumps({}),
)
measurement = 'ALERT'
group_by = ['*', 'host_name']
limit = 10
result = query_filter.filter_dict_list_with_live_query(
self.items,
query
)
result = influxdb_query.build_influxdb_query(query,
measurement,
group_by,
limit)
expected = [{"host_name": "localhost"}]
self.assertItemsEqual(result, expected)
def test_query_builder_filter_all_fields(self):
query = live_query.LiveQuery(
filters=json.dumps({
"is": {
"state": [0],
"description": ["localhost"]
},
"isnot": {
"state": [1]
}
})
)
result = query_filter.filter_dict_list_with_live_query(
self.items,
query
)
expected = [
{'description': 'localhost',
'last_state_change': 1429400991,
'plugin_output': 'OK - localhost: rta 0.047ms, lost 0%',
'last_check': 1429400990,
'state': 0,
'host_name': 'localhost'}
]
expected = "SELECT * FROM ALERT GROUP BY *, host_name LIMIT 10"
self.assertItemsEqual(result, expected)