Refactoring of traffic stats processing

Change-Id: Ia93a49ed536e22fc52d7be15759d7154b5c8a565
This commit is contained in:
Ilya Shakhat 2015-03-03 19:27:29 +03:00
parent 1e5c9da51b
commit cd32d76c81
6 changed files with 105 additions and 47 deletions

View File

@ -17,6 +17,7 @@ import collections
import csv
from oslo_log import log as logging
from shaker.engine import math
LOG = logging.getLogger(__name__)
@ -95,45 +96,37 @@ class IperfExecutor(BaseExecutor):
return cmd.make()
def _calc_stats(array):
return dict(max=max(array), min=min(array), avg=sum(array) / len(array))
Sample = collections.namedtuple('Sample', ['start', 'end', 'value'])
class IperfGraphExecutor(IperfExecutor):
def get_command(self):
self.test_definition['csv'] = True
self.test_definition['interval'] = '1'
self.test_definition['interval'] = 1
return super(IperfGraphExecutor, self).get_command()
def process_reply(self, message):
result = super(IperfGraphExecutor, self).process_reply(message)
samples = collections.defaultdict(list)
streams = {}
stream_count = 0
samples = []
threads_count = self.test_definition.get('threads') or 1
for row in csv.reader(result['stdout'].split('\n')):
if row:
if row and len(row) > 8:
thread = row[5]
if thread not in streams:
streams[thread] = stream_count
stream_count += 1
if threads_count > 1 and thread != -1:
# ignore individual per-thread data
continue
samples['time'].append(float(row[6].split('-')[1]))
samples['bandwidth_%s' % streams[thread]].append(
float(row[8]) / 1024 / 1024)
start, end = row[6].split('-')
samples.append(Sample(start=float(start),
end=float(end),
value=int(row[8])))
# the last line is summary, remove its items
for arr in samples.values():
arr.pop()
samples.pop() # the last line is summary, remove it
result['samples'] = samples
# todo calculate stats correctly for multiple threads
for stream in streams.values():
result['stats'] = _calc_stats(
samples['bandwidth_%s' % stream])
result.update(math.calc_traffic_stats(samples))
return result

41
shaker/engine/math.py Normal file
View File

@ -0,0 +1,41 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def mean(array):
if not array:
return 0
return sum(array) / len(array)
def calc_traffic_stats(samples):
total = sum(sample.value for sample in samples)
duration = sum((sample.end - sample.start) for sample in samples)
values = [int(sample.value / (sample.end - sample.start))
for sample in samples]
row_data = [[sample.end, sample.value / (sample.end - sample.start)]
for sample in samples]
return dict(
stats=dict(
max=max(values),
min=min(values),
mean=mean(values),
total=total,
duration=duration,
),
row_data=row_data,
)

View File

@ -16,10 +16,14 @@
import sys
import jinja2
from oslo_log import log as logging
from shaker.engine import utils
LOG = logging.getLogger(__name__)
def generate_report(report_template, report_filename, data):
template = utils.read_file(report_template)
compiled_template = jinja2.Template(template)
@ -33,3 +37,4 @@ def generate_report(report_template, report_filename, data):
fd.write(rendered_template)
fd.close()
LOG.info('Report generated')

View File

@ -12,6 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import json
import logging as std_logging
@ -134,9 +135,9 @@ def read_scenario():
def _extend_agents(agents):
for agent in agents.values():
if agent.get('slave_id'):
agent['slave'] = utils.copy_dict_kv(agents[agent['slave_id']])
agent['slave'] = copy.deepcopy(agents[agent['slave_id']])
if agent.get('master_id'):
agent['master'] = utils.copy_dict_kv(agents[agent['master_id']])
agent['master'] = copy.deepcopy(agents[agent['master_id']])
def _pick_agents(agents, size):
@ -265,6 +266,7 @@ def main():
elif cfg.CONF.input:
# read json results
LOG.debug('Reading JSON data from %s', cfg.CONF.input)
report_data = json.loads(utils.read_file(cfg.CONF.input))
report.generate_report(cfg.CONF.report_template, cfg.CONF.report,

View File

@ -349,9 +349,9 @@
{% set cnt = result_per_iteration.agents|length %}
<li class="col-md-offset-1"><a href="#test-{{ test.definition.uuid }}-{{ cnt }}" data-toggle="tab">
{% if cnt == 1 %}
Single Thread
full iteration
{% else %}
{{ cnt }} Threads
{{ cnt }} threads
{% endif %}
</a></li>
{% endfor %}
@ -368,11 +368,13 @@
<h4>Agents:</h4>
<ul>
{% for agent in report.agents %}
{% if agent.mode == 'master' %}
<li>Master <i>{{ agent.id }}</i>, IP: <i>{{ agent.ip }}</i>
{% if agent.mode != 'slave' %}
<li>Master <i>{{ agent.id }}</i>, IP: <i>{{ agent.ip }}</i>,
node: <i>{{ agent.node }}</i>, instance: <i>{{ agent.instance_name }}</i>
{% if agent.slave %}
<ul>
<li>Slave <i>{{ agent.slave.id }}</i>, IP: <i>{{ agent.slave.ip }}</i>
<li>Slave <i>{{ agent.slave.id }}</i>, IP: <i>{{ agent.slave.ip }}</i>,
node: <i>{{ agent.slave.node }}</i>, instance: <i>{{ agent.slave.instance_name }}</i>
</ul>
{% endif %}
</li>
@ -391,6 +393,10 @@
{% for result_per_iteration in test.results_per_iteration %}
{% set cnt = result_per_iteration.agents|length %}
<div id="test-{{ test.definition.uuid }}-{{ cnt }}" class="tab-pane">
<h4>Iteration Summary</h4>
{#### PER-AGENT DATA ####}
{% for result_per_agent in result_per_iteration.results_per_agent %}
<h4>Agent {{ result_per_agent.agent.id }}
({{ result_per_agent.agent.ip }})</h4>
@ -399,9 +405,9 @@
<h5>Traffic stats</h5>
<dl class="dl-horizontal">
<dt>Max bandwidth</dt><dd>{{ result_per_agent.stats.max|round(3) }} Mbits/s</dd>
<dt>Min bandwidth</dt><dd>{{ result_per_agent.stats.min|round(3) }} Mbits/s</dd>
<dt>Average bandwidth</dt><dd>{{ result_per_agent.stats.avg|round(3) }} Mbits/s</dd>
<dt>Max bandwidth</dt><dd>{{ result_per_agent.stats.max / 1024 / 1024|round(2) }} Mbits/s</dd>
<dt>Min bandwidth</dt><dd>{{ result_per_agent.stats.min / 1024 / 1024|round(2) }} Mbits/s</dd>
<dt>Mean bandwidth</dt><dd>{{ result_per_agent.stats.mean / 1024 / 1024|round(2) }} Mbits/s</dd>
</dl>
<div id="chart-{{ result_per_agent.uuid }}"></div>
@ -411,14 +417,13 @@
bindto: '#chart-{{ result_per_agent.uuid }}',
data: {
x: 'time',
columns: [
{% set first_item = True %}
{% for stream, array in result_per_agent.samples.items() %}
{% if not first_item %},{% endif %} {% set first_item = False %}
['{{ stream }}', {{ array|join(', ') }}]
rows: [
['time', 'bandwidth']
{% for row in result_per_agent.row_data %}
,['{{ row[0] }}', {{ row[1] / 1024 / 1024 }}]
{% endfor %}
],
types: { bandwidth_0: 'area' }
types: { bandwidth: 'area' }
},
axis: {
x: { label: 'time' },

View File

@ -52,16 +52,28 @@ class TestIperfGraphExecutor(testtools.TestCase):
"""
}
expected = {
'samples': {
'time': [1.0, 2.0, 3.0],
'bandwidth_0': [381.0, 393.0, 387.0],
},
'samples': [
executors.Sample(0.0, 1.0, 399507456),
executors.Sample(1.0, 2.0, 412090368),
executors.Sample(2.0, 3.0, 405798912),
],
'row_data': [
[1.0, 399507456],
[2.0, 412090368],
[3.0, 405798912],
],
'stats': {
'max': 393.0,
'min': 381.0,
'avg': (381.0 + 393.0 + 387.0) / 3
'duration': 3.0,
'max': 412090368,
'min': 399507456,
'mean': (399507456 + 412090368 + 405798912) / 3,
'total': (399507456 + 412090368 + 405798912),
},
}
reply = executor.process_reply(message)
self.assertEqual(expected['samples'], reply['samples'])
self.assertEqual(expected['stats'], reply['stats'])
self.assertEqual(expected['samples'], reply['samples'],
message='Samples data')
self.assertEqual(expected['row_data'], reply['row_data'],
'Row data')
self.assertEqual(expected['stats'], reply['stats'],
'Traffic stats')