Add iperf driver

This commit is contained in:
yaowei 2016-01-13 15:35:46 +08:00
parent a1d2c07c47
commit 51c0157309
10 changed files with 275 additions and 109 deletions

View File

@ -19,12 +19,14 @@ from SocketServer import ThreadingMixIn
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
from stetho.agent import api as agent_api
from stetho.agent.common import utils as agent_utils
from stetho.agent.common import log
# Listening endpoint
LISTEN_ADDR = '0.0.0.0'
LISTEN_PORT = 9698
LOG = log.get_logger()
class AsyncJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer):
pass
@ -32,11 +34,12 @@ class AsyncJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer):
def main():
# log
args = sys.argv[1:]
endpoint = (LISTEN_ADDR, LISTEN_PORT)
server = AsyncJSONRPCServer(endpoint)
server.register_multicall_functions()
agent_utils.register_api(server, agent_api)
api = agent_api.AgentApi()
agent_utils.register_api(server, api)
LOG.info("Agent listening in %s:%s" % endpoint)
server.serve_forever()

View File

@ -16,111 +16,114 @@
import re
from netaddr import IPNetwork
from stetho.agent.common import utils as agent_utils
from stetho.agent.common import log
LOG = log.get_logger()
def check_ports_on_br(bridge='br-ex', ports=['eth3']):
"""Check ports exist on bridge.
class AgentApi(object):
ovs-vsctl list-ports bridge
"""
cmd = ['ovs-vsctl', 'list-ports', bridge]
stdcode, stdout = agent_utils.execute(cmd, root=True)
data = dict()
if stdcode == 0:
for port in ports:
if port in stdout:
data[port] = True
stdout.remove(port)
else:
data[port] = False
return agent_utils.make_response(code=stdcode, data=data)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode,
message=message)
def check_ports_on_br(self, bridge='br-ex', ports=['eth3']):
"""Check ports exist on bridge.
ovs-vsctl list-ports bridge
"""
LOG.info("RPC: check_ports_on_br bridge: %s, ports: %s" %
(bridge, ports))
cmd = ['ovs-vsctl', 'list-ports', bridge]
stdcode, stdout = agent_utils.execute(cmd, root=True)
data = dict()
if stdcode == 0:
for port in ports:
if port in stdout:
data[port] = True
stdout.remove(port)
else:
data[port] = False
return agent_utils.make_response(code=stdcode, data=data)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode,
message=message)
def ping(ips, boardcast=False,
count=2, timeout=2, interface=None):
"""Ping host or broadcast.
def ping(self, ips, boardcast=False,
count=2, timeout=2, interface=None):
"""Ping host or broadcast.
ping host -c 2 -W 2
"""
cmd = ['ping', '-c', str(count), '-W', str(timeout)]
True if not interface else cmd.extend(['-I', interface])
True if not boardcast else cmd.append('-b')
# Batch create subprocess
data = dict()
try:
for ip in ips:
stdcode, stdout = agent_utils.execute(cmd + [ip])
if stdcode:
data[ip] = 100
else:
pattern = r',\s([0-9]+)%\spacket\sloss'
data[ip] = re.search(pattern, stdout[-2]).groups()[0]
return agent_utils.make_response(code=0, data=data)
except Exception as e:
message = e.message
return agent_utils.make_response(code=1, message=message)
ping host -c 2 -W 2
"""
cmd = ['ping', '-c', str(count), '-W', str(timeout)]
True if not interface else cmd.extend(['-I', interface])
True if not boardcast else cmd.append('-b')
# Batch create subprocess
data = dict()
try:
for ip in ips:
stdcode, stdout = agent_utils.execute(cmd + [ip])
if stdcode:
data[ip] = 100
else:
pattern = r',\s([0-9]+)%\spacket\sloss'
data[ip] = re.search(pattern, stdout[-2]).groups()[0]
return agent_utils.make_response(code=0, data=data)
except Exception as e:
message = e.message
return agent_utils.make_response(code=1, message=message)
def add_vlan_to_interface(self, interface, vlan_id):
"""Add vlan interface.
def add_vlan_to_interface(interface, vlan_id):
"""Add vlan interface.
ip link add link eth0 name eth0.10 type vlan id 10
"""
subif = '%s.%s' % (interface, vlan_id)
vlan_id = '%s' % vlan_id
cmd = ['ip', 'link', 'add', 'link', interface, 'name',
subif, 'type', 'vlan', 'id', vlan_id]
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
return agent_utils.make_response(code=stdcode)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode, message=message)
def get_interface(interface='eth0'):
"""Interface info.
ifconfig interface
"""
code, message, data = agent_utils.get_interface(interface)
return agent_utils.make_response(code, message, data)
def setup_link(interface, cidr):
"""Setup a link.
ip addr add dev interface
ip link set dev interface up
"""
# clear old ipaddr in interface
cmd = ['ip', 'addr', 'flush', 'dev', interface]
agent_utils.execute(cmd, root=True)
ip = IPNetwork(cidr)
cmd = ['ip', 'addr', 'add', cidr, 'broadcast',
str(ip.broadcast), 'dev', interface]
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
cmd = ['ip', 'link', 'set', 'dev', interface, 'up']
ip link add link eth0 name eth0.10 type vlan id 10
"""
subif = '%s.%s' % (interface, vlan_id)
vlan_id = '%s' % vlan_id
cmd = ['ip', 'link', 'add', 'link', interface, 'name',
subif, 'type', 'vlan', 'id', vlan_id]
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
return agent_utils.make_response(code=stdcode)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode, message=message)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode, message=message)
def get_interface(self, interface='eth0'):
"""Interface info.
def teardown_link(interface):
"""ip link
"""
cmd = ['ip', 'link', 'delete', interface]
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
return agent_utils.make_response(code=stdcode)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode, message=message)
ifconfig interface
"""
LOG.info("RPC: get_interface interfae: %s" % interface)
code, message, data = agent_utils.get_interface(interface)
return agent_utils.make_response(code, message, data)
def setup_link(self, interface, cidr):
"""Setup a link.
ip addr add dev interface
ip link set dev interface up
"""
# clear old ipaddr in interface
cmd = ['ip', 'addr', 'flush', 'dev', interface]
agent_utils.execute(cmd, root=True)
ip = IPNetwork(cidr)
cmd = ['ip', 'addr', 'add', cidr, 'broadcast',
str(ip.broadcast), 'dev', interface]
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
cmd = ['ip', 'link', 'set', 'dev', interface, 'up']
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
return agent_utils.make_response(code=stdcode)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode, message=message)
def teardown_link(self, interface):
"""ip link
"""
cmd = ['ip', 'link', 'delete', interface]
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
return agent_utils.make_response(code=stdcode)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode, message=message)

View File

@ -17,7 +17,7 @@ import logging
FORMAT = '%(asctime)s %(filename)s %(levelname)s %(message)s'
DATEFMT = '%d %b %Y %H:%M:%S'
FILENAME = '/var/log/stetho/stetho.log'
FILENAME = '/var/log/stetho/stetho-agent.log'
def get_logger(filename=FILENAME, format=FORMAT,

View File

@ -13,19 +13,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
import time
import tempfile
import signal
import shlex
import subprocess
import platform
from threading import Timer
from stetho.agent.common import resource
from stetho.agent.common import log
LOG = log.get_logger()
def execute(cmd, shell=False, root=False, timeout=10):
try:
if root:
cmd.insert(0, 'sudo')
LOG.info(cmd)
subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=shell)
timer = Timer(timeout, lambda proc: proc.kill(), [subproc])
@ -41,9 +48,31 @@ def execute(cmd, shell=False, root=False, timeout=10):
return stdcode, list_strip(stderr) if stdcode else list_strip(stdout)
except Exception as e:
LOG.error(e)
raise
def create_deamon(cmd, shell=False, root=False):
"""Usage:
Create servcice process.
"""
try:
if root:
cmd.insert(0, 'sudo')
LOG.info(cmd)
subproc = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdcode = subproc.returncode
return subproc.pid
except Exception as e:
LOG.error(e)
raise
def kill_process_by_id(pid):
os.kill(int(pid), signal.SIGKILL)
def get_interface(interface):
"""Support Centos standard physical interface,
such as eth0.
@ -114,6 +143,7 @@ def register_api(server, api_obj):
methods = dir(api_obj)
apis = filter(lambda m: not m.startswith('_'), methods)
[server.register_function(getattr(api_obj, api)) for api in apis]
LOG.info("Registered api %s" % apis)
def make_response(code=0, message='', data=dict()):
@ -122,3 +152,10 @@ def make_response(code=0, message='', data=dict()):
response['message'] = '' if message is None else message
response['data'] = dict() if data is None else data
return response
def replace_file(file_name, mode=0o644):
base_dir = os.path.dirname(os.path.abspath(file_name))
tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
os.chmod(tmp_file.name, mode)
os.rename(tmp_file.name, file_name)

View File

View File

@ -0,0 +1,76 @@
# Copyright 2016 UnitedStack, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from stetho.agent.common import log
from stetho.agent.common import utils
LOG = log.get_logger()
OUT_DIR = '/tmp/'
class IPerfDriver(object):
def start_server(self, protocol='TCP', port=5001, mss=None, window=None):
"""iperf -s -D --mss mss
"""
id = uuid.uuid4()
output = OUT_DIR + 'iperf-server-%s' % id
utils.replace_file(output)
cmd = ['iperf', '-s', '-D', '-p', str(port), '-o', output]
if not cmp(protocol, 'UDP'):
cmd.append('-u')
if mss:
cmd.extend(['-m', str(mss)])
if window:
cmd.extend(['-w', str(window)])
cmd.extend(['>', output])
pid = utils.create_deamon(cmd)
data = dict()
data['id'] = id
data['pid'] = pid
return data
def stop_server(self, pid):
utils.kill_process_by_id(pid)
def start_client(self, host, port=5001, protocol='TCP', time=60,
parallel=None, bandwidth=None):
"""iperf -D -c host -t 60
"""
id = uuid.uuid4()
output = OUT_DIR + 'iperf-client-%s' % id
utils.replace_file(output)
cmd = ['iperf', '-D', '-c', host, '-p', str(port), '-t', str(time)]
if not (protocol, 'UDP'):
cmd.append('-u')
if parallel:
cmd.extend(['-P', str(parallel)])
if bandwidth:
cmd.extend(['-b', '%sM' % bandwidth])
cmd.extend(['>', output])
utils.create_deamon(cmd)
data = dict()
data['id'] = id
return data
def get_server_output(self, id):
# TODO: some analysis
pass
def get_client_output(self, id):
# TODO: some analysis
pass

View File

@ -15,7 +15,7 @@
import mock
import unittest
import types
import platform
from stetho.agent.common import utils
@ -25,6 +25,11 @@ class TestUtils(unittest.TestCase):
def setUp(self):
self.test_file = self.get_temp_file_path('test_execute.tmp')
open(self.test_file, 'w+').close()
self.pids = list()
def tearDown(self):
for pid in self.pids:
utils.kill_process_by_id(pid)
def test_execute(self):
expected = "%s\n" % self.test_file
@ -73,3 +78,9 @@ class TestUtils(unittest.TestCase):
# test other distribution
platform.linux_distribution = mock.Mock(return_value=['', '6.6', ''])
self.assertEqual(utils.get_interface('eth0')[0], 1)
def test_create_deamon(self):
cmd = ['ping', '1.2.4.8']
pid = utils.create_deamon(cmd)
self.pids.append(pid)
self.assertEqual(type(pid), types.IntType)

View File

@ -0,0 +1,34 @@
# Copyright 2016 UnitedStack, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest
from stetho.agent.drivers import iperf
from stetho.agent.common import utils
class TestIPerfDriver(unittest.TestCase):
def setUp(self):
self.iperfd = iperf.IPerfDriver()
def test_start_server(self):
utils.create_deamon = mock.Mock(return_value=1000)
data = self.iperfd.start_server(protocol='UDP')
self.assertEqual(data['pid'], 1000)
def test_start_client(self):
utils.create_deamon = mock.Mock()
self.iperfd.start_client('127.0.0.1')
self.assertEqual(utils.create_deamon.called, True)

View File

@ -20,48 +20,50 @@ from stetho.agent.common import utils as agent_utils
class TestApi(unittest.TestCase):
def setUp(self):
self.agent_api = api.AgentApi()
def test_check_ports_on_br(self):
agent_utils.execute = mock.Mock(return_value=(0, ['execute']))
agent_utils.make_response = mock.Mock(return_value=dict())
api.check_ports_on_br()
self.agent_api.check_ports_on_br()
self.assertEqual(agent_utils.execute.called, True)
self.assertEqual(agent_utils.make_response.called, True)
agent_utils.execute = mock.Mock(return_value=(1, ['execute']))
api.check_ports_on_br()
self.agent_api.check_ports_on_br()
self.assertEqual(agent_utils.make_response.called, True)
def test_ping(self):
stdout = ['', '2 packets transmitted, 2 received, 0% packet loss', '']
agent_utils.execute = mock.Mock(return_value=(0, stdout))
agent_utils.make_response = mock.Mock(return_value=dict())
api.ping(['1.2.4.8', '1.2.4.9'])
self.agent_api.ping(['1.2.4.8', '1.2.4.9'])
self.assertEqual(agent_utils.make_response.called, True)
stdout = 'stdout'
agent_utils.execute = mock.Mock(return_value=(0, stdout))
api.ping(['1.2.4.8', '1.2.4.9'])
self.agent_api.ping(['1.2.4.8', '1.2.4.9'])
self.assertEqual(agent_utils.make_response.called, True)
def test_get_interface(self):
get_interface = mock.Mock(return_value=(0, '', dict()))
agent_utils.get_interface = get_interface
api.get_interface()
self.agent_api.get_interface()
self.assertEqual(agent_utils.get_interface.called, True)
def test_set_link(self):
stdout = ['', '']
agent_utils.execute = mock.Mock(return_value=(0, stdout))
api.setup_link('eth0', '10.0.0.100/24')
self.agent_api.setup_link('eth0', '10.0.0.100/24')
self.assertEqual(agent_utils.make_response.called, True)
agent_utils.execute = mock.Mock(return_value=(1, stdout))
api.setup_link('eth0', '10.0.0.100/24')
self.agent_api.setup_link('eth0', '10.0.0.100/24')
self.assertEqual(agent_utils.make_response.called, True)
def test_teardown_link(self):
stdout = ['', '']
agent_utils.execute = mock.Mock(return_value=(0, stdout))
api.teardown_link('eth0')
self.agent_api.teardown_link('eth0')
self.assertEqual(agent_utils.make_response.called, True)
agent_utils.execute = mock.Mock(return_value=(1, stdout))
api.teardown_link('eth0')
self.agent_api.teardown_link('eth0')
self.assertEqual(agent_utils.make_response.called, True)