From 698c248ca07fd93e6606576124d13e795bfebbc9 Mon Sep 17 00:00:00 2001 From: Kun Huang Date: Thu, 29 Oct 2015 16:05:06 +0800 Subject: [PATCH] parse rpc data before write into db sca start -a rpc sleep 20 sca stop sca report [you will see two results here: pkts and bytes] Change-Id: Ibab207c43ef1a5509c8f85c0003cee48c4b66625 --- scalpels/agents/base.py | 29 +++++++++++++++++++++++++++++ scalpels/cli/actions/report.py | 2 +- scalpels/db/api.py | 4 ++-- scalpels/db/sqlalchemy/api.py | 4 ++-- scalpels/db/sqlalchemy/models.py | 1 + scripts/agent.py | 12 +++++++++--- scripts/port-input-traffic.sh | 7 +++---- 7 files changed, 47 insertions(+), 12 deletions(-) diff --git a/scalpels/agents/base.py b/scalpels/agents/base.py index ef35a5c..f8c2150 100644 --- a/scalpels/agents/base.py +++ b/scalpels/agents/base.py @@ -13,3 +13,32 @@ def run_agent(task_uuid, ag): cmd = "python %s/agent.py %s %s" % (data_dir, task_uuid, ag) ag = subprocess.Popen(cmd.split()) return ag.pid + +def parse_rpc(out): + """ + in: + ts, 123.00 pkts 2312 bytes + ... + ... + out: + name: Port Traffic + unit: pkts + data: [(ts, 123.00), ...] + + name: Port Traffic + unit: bytes + data: [(ts, 2312.00), ...] + """ + ag_name = "Port Traffic" + pkts_ret = {"name": ag_name, + "unit": "pkts", + "data":[]} + bytes_ret = {"name": ag_name, + "unit": "bytes", + "data":[]} + for ts, _t in out: + pkts, pkts_unit, bytes, bytes_unit = _t.split(" ", 3) + pkts_ret["data"].append((ts, pkts)) + bytes_ret["data"].append((ts, bytes)) + + return (pkts_ret, bytes_ret) diff --git a/scalpels/cli/actions/report.py b/scalpels/cli/actions/report.py index f75a81b..54a6750 100644 --- a/scalpels/cli/actions/report.py +++ b/scalpels/cli/actions/report.py @@ -33,4 +33,4 @@ def run(config): for ret_uuid in task.results: ret = db_api.result_get(ret_uuid) results.append(ret.data) - print "result <%s>, data: %s" % (ret.uuid, ret.data) + print "result <%s>, data: %s, unit: %s, name: %s" % (ret.uuid, ret.data, ret.unit, ret.name) diff --git a/scalpels/db/api.py b/scalpels/db/api.py index f430752..eb84b5b 100644 --- a/scalpels/db/api.py +++ b/scalpels/db/api.py @@ -19,12 +19,12 @@ def db_create(sc): def db_drop(): IMPL.db_drop() -def result_create(data): +def result_create(name="", unit="", data=None): """ :param data: a list :) :returns: result model obj """ - return IMPL.result_create(data) + return IMPL.result_create(name, unit, data) def task_create(results, pids): """ diff --git a/scalpels/db/sqlalchemy/api.py b/scalpels/db/sqlalchemy/api.py index b37cec9..fadd53b 100644 --- a/scalpels/db/sqlalchemy/api.py +++ b/scalpels/db/sqlalchemy/api.py @@ -48,9 +48,9 @@ def model_query(model, session=None): query = oslodbsqa_utils.model_query(model, session) return query -def result_create(data): +def result_create(name="", unit="", data=None): result = models.Result() - result.update({"data":data}) + result.update({"name":name, "unit": unit, "data":data}) result.save() return result diff --git a/scalpels/db/sqlalchemy/models.py b/scalpels/db/sqlalchemy/models.py index a199c91..76e12dd 100644 --- a/scalpels/db/sqlalchemy/models.py +++ b/scalpels/db/sqlalchemy/models.py @@ -43,6 +43,7 @@ class Result(BASE, ScalpelsBase): uuid = Column(String(36), default=lambda : str(uuid.uuid4()), nullable=False) data = Column(JSONEncodedData, nullable=False) unit = Column(String(20), nullable=True) + name = Column(String(20), nullable=True) class Setup(BASE, ScalpelsBase): __tablename__ = "setup" diff --git a/scripts/agent.py b/scripts/agent.py index 1a088aa..ae46e7f 100755 --- a/scripts/agent.py +++ b/scripts/agent.py @@ -10,6 +10,7 @@ from copy import deepcopy as copy import signal from tooz import coordination import time +from scalpels.agents import base """ python /agent.py mysql @@ -32,14 +33,18 @@ if __name__ == "__main__": t = worker.stdout.readline() if not len(t): break - out.append(t.strip()) + _t = (time.time(), t.strip()) + out.append(_t) except KeyboardInterrupt: pass + # psutil is much more professional... I have to use it instead # this kill is to script process worker_p = psutil.Process(worker.pid) worker_p.send_signal(signal.SIGINT) + parse_func = getattr(base, "parse_%s" % ag) + # TODO file lock is okay in localhost, here need redis for distributed # lock istead co = coordination.get_coordinator("file:///tmp", b"localhost") @@ -48,8 +53,9 @@ if __name__ == "__main__": with lock: task = db_api.task_get(task_uuid) results = copy(task.results) - ret = db_api.result_create(out) - results.append(ret.uuid) + for ret in parse_func(out): + ret = db_api.result_create(**ret) + results.append(ret.uuid) db_api.task_update(task_uuid, results=results) time.sleep(2) co.stop() diff --git a/scripts/port-input-traffic.sh b/scripts/port-input-traffic.sh index bc6c13b..1412ead 100755 --- a/scripts/port-input-traffic.sh +++ b/scripts/port-input-traffic.sh @@ -22,7 +22,7 @@ fi rule="$chain -p $protocol --dport $port" #XXX iptables -A INPUT -p tcp --dport 5672 -echo applying rule: $rule +#echo applying rule: $rule sudo iptables -t mangle -A $rule interval=3 @@ -33,11 +33,10 @@ while [ 1 -eq 1 ] ; do sleep $interval n_packages=`sudo iptables -t mangle -L $chain -n -v -x | grep $port | grep $protocol | tail -n 1 | awk '{print $1}'` n_bytes=`sudo iptables -t mangle -L $chain -n -v -x | grep $port | grep $protocol | tail -n 1 | awk '{print $2}'` - python -c "print '%0.2f pkt/s' % (float($n_packages-$packages)/int($interval))" - python -c "print '%0.2f byte/s' % (float($n_bytes-$bytes)/int($interval))" + python -c "print '%0.2f pkt/s %0.2f byte/s' % (float($n_packages-$packages)/int($interval), float($n_bytes-$bytes)/int($interval))" packages=$n_packages bytes=$n_bytes done -echo deleting rule: $rule +#echo deleting rule: $rule sudo iptables -t mangle -D $rule