From 51c0157309dd14e69baed95127199cbea64d55cf Mon Sep 17 00:00:00 2001 From: yaowei Date: Wed, 13 Jan 2016 15:35:46 +0800 Subject: [PATCH 1/2] Add iperf driver --- stetho/agent/agent.py | 9 +- stetho/agent/api.py | 193 +++++++++--------- stetho/agent/common/log.py | 2 +- stetho/agent/common/utils.py | 37 ++++ stetho/agent/drivers/__init__.py | 0 stetho/agent/drivers/iperf.py | 76 +++++++ stetho/tests/unit/agent/common/test_utils.py | 13 +- stetho/tests/unit/agent/drivers/__init__.py | 0 stetho/tests/unit/agent/drivers/test_iperf.py | 34 +++ stetho/tests/unit/agent/test_api.py | 20 +- 10 files changed, 275 insertions(+), 109 deletions(-) create mode 100644 stetho/agent/drivers/__init__.py create mode 100644 stetho/agent/drivers/iperf.py create mode 100644 stetho/tests/unit/agent/drivers/__init__.py create mode 100644 stetho/tests/unit/agent/drivers/test_iperf.py diff --git a/stetho/agent/agent.py b/stetho/agent/agent.py index f10a68d..cf35ca1 100644 --- a/stetho/agent/agent.py +++ b/stetho/agent/agent.py @@ -19,12 +19,14 @@ from SocketServer import ThreadingMixIn from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer from stetho.agent import api as agent_api from stetho.agent.common import utils as agent_utils - +from stetho.agent.common import log # Listening endpoint LISTEN_ADDR = '0.0.0.0' LISTEN_PORT = 9698 +LOG = log.get_logger() + class AsyncJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer): pass @@ -32,11 +34,12 @@ class AsyncJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer): def main(): # log - args = sys.argv[1:] endpoint = (LISTEN_ADDR, LISTEN_PORT) server = AsyncJSONRPCServer(endpoint) server.register_multicall_functions() - agent_utils.register_api(server, agent_api) + api = agent_api.AgentApi() + agent_utils.register_api(server, api) + LOG.info("Agent listening in %s:%s" % endpoint) server.serve_forever() diff --git a/stetho/agent/api.py b/stetho/agent/api.py index fdd2940..92931b0 100644 --- a/stetho/agent/api.py +++ b/stetho/agent/api.py @@ -16,111 +16,114 @@ import re from netaddr import IPNetwork from stetho.agent.common import utils as agent_utils +from stetho.agent.common import log + +LOG = log.get_logger() -def check_ports_on_br(bridge='br-ex', ports=['eth3']): - """Check ports exist on bridge. +class AgentApi(object): - ovs-vsctl list-ports bridge - """ - cmd = ['ovs-vsctl', 'list-ports', bridge] - stdcode, stdout = agent_utils.execute(cmd, root=True) - data = dict() - if stdcode == 0: - for port in ports: - if port in stdout: - data[port] = True - stdout.remove(port) - else: - data[port] = False - return agent_utils.make_response(code=stdcode, data=data) - # execute failed. - message = stdout.pop(0) - return agent_utils.make_response(code=stdcode, - message=message) + def check_ports_on_br(self, bridge='br-ex', ports=['eth3']): + """Check ports exist on bridge. + ovs-vsctl list-ports bridge + """ + LOG.info("RPC: check_ports_on_br bridge: %s, ports: %s" % + (bridge, ports)) + cmd = ['ovs-vsctl', 'list-ports', bridge] + stdcode, stdout = agent_utils.execute(cmd, root=True) + data = dict() + if stdcode == 0: + for port in ports: + if port in stdout: + data[port] = True + stdout.remove(port) + else: + data[port] = False + return agent_utils.make_response(code=stdcode, data=data) + # execute failed. + message = stdout.pop(0) + return agent_utils.make_response(code=stdcode, + message=message) -def ping(ips, boardcast=False, - count=2, timeout=2, interface=None): - """Ping host or broadcast. + def ping(self, ips, boardcast=False, + count=2, timeout=2, interface=None): + """Ping host or broadcast. - ping host -c 2 -W 2 - """ - cmd = ['ping', '-c', str(count), '-W', str(timeout)] - True if not interface else cmd.extend(['-I', interface]) - True if not boardcast else cmd.append('-b') - # Batch create subprocess - data = dict() - try: - for ip in ips: - stdcode, stdout = agent_utils.execute(cmd + [ip]) - if stdcode: - data[ip] = 100 - else: - pattern = r',\s([0-9]+)%\spacket\sloss' - data[ip] = re.search(pattern, stdout[-2]).groups()[0] - return agent_utils.make_response(code=0, data=data) - except Exception as e: - message = e.message - return agent_utils.make_response(code=1, message=message) + ping host -c 2 -W 2 + """ + cmd = ['ping', '-c', str(count), '-W', str(timeout)] + True if not interface else cmd.extend(['-I', interface]) + True if not boardcast else cmd.append('-b') + # Batch create subprocess + data = dict() + try: + for ip in ips: + stdcode, stdout = agent_utils.execute(cmd + [ip]) + if stdcode: + data[ip] = 100 + else: + pattern = r',\s([0-9]+)%\spacket\sloss' + data[ip] = re.search(pattern, stdout[-2]).groups()[0] + return agent_utils.make_response(code=0, data=data) + except Exception as e: + message = e.message + return agent_utils.make_response(code=1, message=message) + def add_vlan_to_interface(self, interface, vlan_id): + """Add vlan interface. -def add_vlan_to_interface(interface, vlan_id): - """Add vlan interface. - - ip link add link eth0 name eth0.10 type vlan id 10 - """ - subif = '%s.%s' % (interface, vlan_id) - vlan_id = '%s' % vlan_id - cmd = ['ip', 'link', 'add', 'link', interface, 'name', - subif, 'type', 'vlan', 'id', vlan_id] - stdcode, stdout = agent_utils.execute(cmd, root=True) - if stdcode == 0: - return agent_utils.make_response(code=stdcode) - # execute failed. - message = stdout.pop(0) - return agent_utils.make_response(code=stdcode, message=message) - - -def get_interface(interface='eth0'): - """Interface info. - - ifconfig interface - """ - code, message, data = agent_utils.get_interface(interface) - return agent_utils.make_response(code, message, data) - - -def setup_link(interface, cidr): - """Setup a link. - - ip addr add dev interface - ip link set dev interface up - """ - # clear old ipaddr in interface - cmd = ['ip', 'addr', 'flush', 'dev', interface] - agent_utils.execute(cmd, root=True) - ip = IPNetwork(cidr) - cmd = ['ip', 'addr', 'add', cidr, 'broadcast', - str(ip.broadcast), 'dev', interface] - stdcode, stdout = agent_utils.execute(cmd, root=True) - if stdcode == 0: - cmd = ['ip', 'link', 'set', 'dev', interface, 'up'] + ip link add link eth0 name eth0.10 type vlan id 10 + """ + subif = '%s.%s' % (interface, vlan_id) + vlan_id = '%s' % vlan_id + cmd = ['ip', 'link', 'add', 'link', interface, 'name', + subif, 'type', 'vlan', 'id', vlan_id] stdcode, stdout = agent_utils.execute(cmd, root=True) if stdcode == 0: return agent_utils.make_response(code=stdcode) - # execute failed. - message = stdout.pop(0) - return agent_utils.make_response(code=stdcode, message=message) + # execute failed. + message = stdout.pop(0) + return agent_utils.make_response(code=stdcode, message=message) + def get_interface(self, interface='eth0'): + """Interface info. -def teardown_link(interface): - """ip link - """ - cmd = ['ip', 'link', 'delete', interface] - stdcode, stdout = agent_utils.execute(cmd, root=True) - if stdcode == 0: - return agent_utils.make_response(code=stdcode) - # execute failed. - message = stdout.pop(0) - return agent_utils.make_response(code=stdcode, message=message) + ifconfig interface + """ + LOG.info("RPC: get_interface interfae: %s" % interface) + code, message, data = agent_utils.get_interface(interface) + return agent_utils.make_response(code, message, data) + + def setup_link(self, interface, cidr): + """Setup a link. + + ip addr add dev interface + ip link set dev interface up + """ + # clear old ipaddr in interface + cmd = ['ip', 'addr', 'flush', 'dev', interface] + agent_utils.execute(cmd, root=True) + ip = IPNetwork(cidr) + cmd = ['ip', 'addr', 'add', cidr, 'broadcast', + str(ip.broadcast), 'dev', interface] + stdcode, stdout = agent_utils.execute(cmd, root=True) + if stdcode == 0: + cmd = ['ip', 'link', 'set', 'dev', interface, 'up'] + stdcode, stdout = agent_utils.execute(cmd, root=True) + if stdcode == 0: + return agent_utils.make_response(code=stdcode) + # execute failed. + message = stdout.pop(0) + return agent_utils.make_response(code=stdcode, message=message) + + def teardown_link(self, interface): + """ip link + """ + cmd = ['ip', 'link', 'delete', interface] + stdcode, stdout = agent_utils.execute(cmd, root=True) + if stdcode == 0: + return agent_utils.make_response(code=stdcode) + # execute failed. + message = stdout.pop(0) + return agent_utils.make_response(code=stdcode, message=message) diff --git a/stetho/agent/common/log.py b/stetho/agent/common/log.py index ba2fa91..8c4ac6e 100644 --- a/stetho/agent/common/log.py +++ b/stetho/agent/common/log.py @@ -17,7 +17,7 @@ import logging FORMAT = '%(asctime)s %(filename)s %(levelname)s %(message)s' DATEFMT = '%d %b %Y %H:%M:%S' -FILENAME = '/var/log/stetho/stetho.log' +FILENAME = '/var/log/stetho/stetho-agent.log' def get_logger(filename=FILENAME, format=FORMAT, diff --git a/stetho/agent/common/utils.py b/stetho/agent/common/utils.py index 81f12dc..49c77ce 100644 --- a/stetho/agent/common/utils.py +++ b/stetho/agent/common/utils.py @@ -13,19 +13,26 @@ # License for the specific language governing permissions and limitations # under the License. +import os import re import time +import tempfile +import signal import shlex import subprocess import platform from threading import Timer from stetho.agent.common import resource +from stetho.agent.common import log + +LOG = log.get_logger() def execute(cmd, shell=False, root=False, timeout=10): try: if root: cmd.insert(0, 'sudo') + LOG.info(cmd) subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell) timer = Timer(timeout, lambda proc: proc.kill(), [subproc]) @@ -41,9 +48,31 @@ def execute(cmd, shell=False, root=False, timeout=10): return stdcode, list_strip(stderr) if stdcode else list_strip(stdout) except Exception as e: + LOG.error(e) raise +def create_deamon(cmd, shell=False, root=False): + """Usage: + Create servcice process. + """ + try: + if root: + cmd.insert(0, 'sudo') + LOG.info(cmd) + subproc = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdcode = subproc.returncode + return subproc.pid + except Exception as e: + LOG.error(e) + raise + + +def kill_process_by_id(pid): + os.kill(int(pid), signal.SIGKILL) + + def get_interface(interface): """Support Centos standard physical interface, such as eth0. @@ -114,6 +143,7 @@ def register_api(server, api_obj): methods = dir(api_obj) apis = filter(lambda m: not m.startswith('_'), methods) [server.register_function(getattr(api_obj, api)) for api in apis] + LOG.info("Registered api %s" % apis) def make_response(code=0, message='', data=dict()): @@ -122,3 +152,10 @@ def make_response(code=0, message='', data=dict()): response['message'] = '' if message is None else message response['data'] = dict() if data is None else data return response + + +def replace_file(file_name, mode=0o644): + base_dir = os.path.dirname(os.path.abspath(file_name)) + tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False) + os.chmod(tmp_file.name, mode) + os.rename(tmp_file.name, file_name) diff --git a/stetho/agent/drivers/__init__.py b/stetho/agent/drivers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stetho/agent/drivers/iperf.py b/stetho/agent/drivers/iperf.py new file mode 100644 index 0000000..51a8b83 --- /dev/null +++ b/stetho/agent/drivers/iperf.py @@ -0,0 +1,76 @@ +# Copyright 2016 UnitedStack, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid +from stetho.agent.common import log +from stetho.agent.common import utils + +LOG = log.get_logger() + +OUT_DIR = '/tmp/' + + +class IPerfDriver(object): + + def start_server(self, protocol='TCP', port=5001, mss=None, window=None): + """iperf -s -D --mss mss + """ + id = uuid.uuid4() + output = OUT_DIR + 'iperf-server-%s' % id + utils.replace_file(output) + cmd = ['iperf', '-s', '-D', '-p', str(port), '-o', output] + if not cmp(protocol, 'UDP'): + cmd.append('-u') + if mss: + cmd.extend(['-m', str(mss)]) + if window: + cmd.extend(['-w', str(window)]) + cmd.extend(['>', output]) + pid = utils.create_deamon(cmd) + data = dict() + data['id'] = id + data['pid'] = pid + return data + + def stop_server(self, pid): + utils.kill_process_by_id(pid) + + def start_client(self, host, port=5001, protocol='TCP', time=60, + parallel=None, bandwidth=None): + """iperf -D -c host -t 60 + """ + id = uuid.uuid4() + output = OUT_DIR + 'iperf-client-%s' % id + utils.replace_file(output) + cmd = ['iperf', '-D', '-c', host, '-p', str(port), '-t', str(time)] + if not (protocol, 'UDP'): + cmd.append('-u') + if parallel: + cmd.extend(['-P', str(parallel)]) + if bandwidth: + cmd.extend(['-b', '%sM' % bandwidth]) + cmd.extend(['>', output]) + utils.create_deamon(cmd) + data = dict() + data['id'] = id + return data + + def get_server_output(self, id): + # TODO: some analysis + pass + + def get_client_output(self, id): + # TODO: some analysis + pass diff --git a/stetho/tests/unit/agent/common/test_utils.py b/stetho/tests/unit/agent/common/test_utils.py index 440d8e4..13d6b0c 100644 --- a/stetho/tests/unit/agent/common/test_utils.py +++ b/stetho/tests/unit/agent/common/test_utils.py @@ -15,7 +15,7 @@ import mock import unittest - +import types import platform from stetho.agent.common import utils @@ -25,6 +25,11 @@ class TestUtils(unittest.TestCase): def setUp(self): self.test_file = self.get_temp_file_path('test_execute.tmp') open(self.test_file, 'w+').close() + self.pids = list() + + def tearDown(self): + for pid in self.pids: + utils.kill_process_by_id(pid) def test_execute(self): expected = "%s\n" % self.test_file @@ -73,3 +78,9 @@ class TestUtils(unittest.TestCase): # test other distribution platform.linux_distribution = mock.Mock(return_value=['', '6.6', '']) self.assertEqual(utils.get_interface('eth0')[0], 1) + + def test_create_deamon(self): + cmd = ['ping', '1.2.4.8'] + pid = utils.create_deamon(cmd) + self.pids.append(pid) + self.assertEqual(type(pid), types.IntType) diff --git a/stetho/tests/unit/agent/drivers/__init__.py b/stetho/tests/unit/agent/drivers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stetho/tests/unit/agent/drivers/test_iperf.py b/stetho/tests/unit/agent/drivers/test_iperf.py new file mode 100644 index 0000000..4e2f15c --- /dev/null +++ b/stetho/tests/unit/agent/drivers/test_iperf.py @@ -0,0 +1,34 @@ +# Copyright 2016 UnitedStack, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import unittest +from stetho.agent.drivers import iperf +from stetho.agent.common import utils + + +class TestIPerfDriver(unittest.TestCase): + def setUp(self): + self.iperfd = iperf.IPerfDriver() + + def test_start_server(self): + utils.create_deamon = mock.Mock(return_value=1000) + data = self.iperfd.start_server(protocol='UDP') + self.assertEqual(data['pid'], 1000) + + def test_start_client(self): + utils.create_deamon = mock.Mock() + self.iperfd.start_client('127.0.0.1') + self.assertEqual(utils.create_deamon.called, True) diff --git a/stetho/tests/unit/agent/test_api.py b/stetho/tests/unit/agent/test_api.py index 89bbd0f..52e8451 100644 --- a/stetho/tests/unit/agent/test_api.py +++ b/stetho/tests/unit/agent/test_api.py @@ -20,48 +20,50 @@ from stetho.agent.common import utils as agent_utils class TestApi(unittest.TestCase): + def setUp(self): + self.agent_api = api.AgentApi() def test_check_ports_on_br(self): agent_utils.execute = mock.Mock(return_value=(0, ['execute'])) agent_utils.make_response = mock.Mock(return_value=dict()) - api.check_ports_on_br() + self.agent_api.check_ports_on_br() self.assertEqual(agent_utils.execute.called, True) self.assertEqual(agent_utils.make_response.called, True) agent_utils.execute = mock.Mock(return_value=(1, ['execute'])) - api.check_ports_on_br() + self.agent_api.check_ports_on_br() self.assertEqual(agent_utils.make_response.called, True) def test_ping(self): stdout = ['', '2 packets transmitted, 2 received, 0% packet loss', ''] agent_utils.execute = mock.Mock(return_value=(0, stdout)) agent_utils.make_response = mock.Mock(return_value=dict()) - api.ping(['1.2.4.8', '1.2.4.9']) + self.agent_api.ping(['1.2.4.8', '1.2.4.9']) self.assertEqual(agent_utils.make_response.called, True) stdout = 'stdout' agent_utils.execute = mock.Mock(return_value=(0, stdout)) - api.ping(['1.2.4.8', '1.2.4.9']) + self.agent_api.ping(['1.2.4.8', '1.2.4.9']) self.assertEqual(agent_utils.make_response.called, True) def test_get_interface(self): get_interface = mock.Mock(return_value=(0, '', dict())) agent_utils.get_interface = get_interface - api.get_interface() + self.agent_api.get_interface() self.assertEqual(agent_utils.get_interface.called, True) def test_set_link(self): stdout = ['', ''] agent_utils.execute = mock.Mock(return_value=(0, stdout)) - api.setup_link('eth0', '10.0.0.100/24') + self.agent_api.setup_link('eth0', '10.0.0.100/24') self.assertEqual(agent_utils.make_response.called, True) agent_utils.execute = mock.Mock(return_value=(1, stdout)) - api.setup_link('eth0', '10.0.0.100/24') + self.agent_api.setup_link('eth0', '10.0.0.100/24') self.assertEqual(agent_utils.make_response.called, True) def test_teardown_link(self): stdout = ['', ''] agent_utils.execute = mock.Mock(return_value=(0, stdout)) - api.teardown_link('eth0') + self.agent_api.teardown_link('eth0') self.assertEqual(agent_utils.make_response.called, True) agent_utils.execute = mock.Mock(return_value=(1, stdout)) - api.teardown_link('eth0') + self.agent_api.teardown_link('eth0') self.assertEqual(agent_utils.make_response.called, True) From b23704c26012205f55da9151369bd29077a68033 Mon Sep 17 00:00:00 2001 From: yaowei Date: Wed, 13 Jan 2016 17:13:26 +0800 Subject: [PATCH 2/2] Modify unittest: mock create_deamon --- stetho/tests/unit/agent/common/test_utils.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/stetho/tests/unit/agent/common/test_utils.py b/stetho/tests/unit/agent/common/test_utils.py index 13d6b0c..9c05a8b 100644 --- a/stetho/tests/unit/agent/common/test_utils.py +++ b/stetho/tests/unit/agent/common/test_utils.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import os import mock import unittest import types @@ -80,7 +81,13 @@ class TestUtils(unittest.TestCase): self.assertEqual(utils.get_interface('eth0')[0], 1) def test_create_deamon(self): - cmd = ['ping', '1.2.4.8'] + cmd = ["ls", self.test_file] pid = utils.create_deamon(cmd) self.pids.append(pid) self.assertEqual(type(pid), types.IntType) + + def test_kill_process_by_id(self): + pid = 100 + os.kill = mock.Mock() + utils.kill_process_by_id(pid) + self.assertEqual(os.kill.called, True)