Merge "Topic/server arguments changed in simulator.py"
This commit is contained in:
commit
e9abc3e5da
@ -16,6 +16,7 @@ eventlet.monkey_patch()
|
|||||||
import argparse
|
import argparse
|
||||||
import collections
|
import collections
|
||||||
import datetime
|
import datetime
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
@ -242,6 +243,16 @@ def rpc_server(transport, target, wait_before_answer, executor, show_stats,
|
|||||||
server.wait()
|
server.wait()
|
||||||
|
|
||||||
|
|
||||||
|
def spawn_rpc_clients(threads, transport, targets,
|
||||||
|
*args, **kwargs):
|
||||||
|
p = eventlet.GreenPool(size=threads)
|
||||||
|
targets = itertools.cycle(targets)
|
||||||
|
for i in range(0, threads):
|
||||||
|
target = targets.next()
|
||||||
|
p.spawn_n(send_msg, i, transport, target, *args, **kwargs)
|
||||||
|
p.waitall()
|
||||||
|
|
||||||
|
|
||||||
def threads_spawner(threads, method, *args, **kwargs):
|
def threads_spawner(threads, method, *args, **kwargs):
|
||||||
p = eventlet.GreenPool(size=threads)
|
p = eventlet.GreenPool(size=threads)
|
||||||
for i in range(0, threads):
|
for i in range(0, threads):
|
||||||
@ -319,11 +330,16 @@ def main():
|
|||||||
parser.add_argument('-d', '--debug', dest='debug', type=bool,
|
parser.add_argument('-d', '--debug', dest='debug', type=bool,
|
||||||
default=False,
|
default=False,
|
||||||
help="Turn on DEBUG logging level instead of WARN")
|
help="Turn on DEBUG logging level instead of WARN")
|
||||||
parser.add_argument('-tp', '--topic', dest='topic',
|
parser.add_argument('-tp', '--topics', dest='topics', nargs="+",
|
||||||
default="profiler_topic",
|
default=["profiler_topic"],
|
||||||
help="Topic to publish/receive messages to/from.")
|
help="Topics to publish/receive messages to/from.")
|
||||||
|
parser.add_argument('-s', '--server', dest='server',
|
||||||
|
default="profiler_server",
|
||||||
|
help="Server to publish/receive messages to/from.")
|
||||||
parser.add_argument('-l', dest='duration', type=int,
|
parser.add_argument('-l', dest='duration', type=int,
|
||||||
help='send messages for certain time')
|
help='send messages for certain time')
|
||||||
|
parser.add_argument('--config-file', dest='config_file', type=str,
|
||||||
|
help="Oslo messaging config file")
|
||||||
|
|
||||||
subparsers = parser.add_subparsers(dest='mode',
|
subparsers = parser.add_subparsers(dest='mode',
|
||||||
help='notify/rpc server/client mode')
|
help='notify/rpc server/client mode')
|
||||||
@ -371,6 +387,9 @@ def main():
|
|||||||
|
|
||||||
_setup_logging(is_debug=args.debug)
|
_setup_logging(is_debug=args.debug)
|
||||||
|
|
||||||
|
if args.config_file:
|
||||||
|
cfg.CONF(["--config-file", args.config_file])
|
||||||
|
|
||||||
if args.mode in ['rpc-server', 'rpc-client']:
|
if args.mode in ['rpc-server', 'rpc-client']:
|
||||||
transport = messaging.get_transport(cfg.CONF, url=args.url)
|
transport = messaging.get_transport(cfg.CONF, url=args.url)
|
||||||
else:
|
else:
|
||||||
@ -378,7 +397,6 @@ def main():
|
|||||||
url=args.url)
|
url=args.url)
|
||||||
cfg.CONF.oslo_messaging_notifications.topics = "notif"
|
cfg.CONF.oslo_messaging_notifications.topics = "notif"
|
||||||
cfg.CONF.oslo_messaging_notifications.driver = "messaging"
|
cfg.CONF.oslo_messaging_notifications.driver = "messaging"
|
||||||
target = messaging.Target(topic=args.topic, server='profiler_server')
|
|
||||||
|
|
||||||
# oslo.config defaults
|
# oslo.config defaults
|
||||||
cfg.CONF.heartbeat_interval = 5
|
cfg.CONF.heartbeat_interval = 5
|
||||||
@ -386,6 +404,7 @@ def main():
|
|||||||
cfg.CONF.project = 'oslo.messaging'
|
cfg.CONF.project = 'oslo.messaging'
|
||||||
|
|
||||||
if args.mode == 'rpc-server':
|
if args.mode == 'rpc-server':
|
||||||
|
target = messaging.Target(topic=args.topics[0], server=args.server)
|
||||||
if args.url.startswith('zmq'):
|
if args.url.startswith('zmq'):
|
||||||
cfg.CONF.rpc_zmq_matchmaker = "redis"
|
cfg.CONF.rpc_zmq_matchmaker = "redis"
|
||||||
transport._driver.matchmaker._redis.flushdb()
|
transport._driver.matchmaker._redis.flushdb()
|
||||||
@ -402,9 +421,11 @@ def main():
|
|||||||
init_msg(args.messages)
|
init_msg(args.messages)
|
||||||
|
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
threads_spawner(args.threads, send_msg, transport, target,
|
targets = [messaging.Target(topic=topic, server=args.server) for topic
|
||||||
args.wait_after_msg, args.timeout, args.is_cast,
|
in args.topics]
|
||||||
args.messages, args.duration)
|
spawn_rpc_clients(args.threads, transport, targets,
|
||||||
|
args.wait_after_msg, args.timeout, args.is_cast,
|
||||||
|
args.messages, args.duration)
|
||||||
time_elapsed = (datetime.datetime.now() - start).total_seconds()
|
time_elapsed = (datetime.datetime.now() - start).total_seconds()
|
||||||
|
|
||||||
msg_count = 0
|
msg_count = 0
|
||||||
@ -419,7 +440,7 @@ def main():
|
|||||||
log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % (
|
log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % (
|
||||||
total_bytes, time_elapsed, (total_bytes / time_elapsed))
|
total_bytes, time_elapsed, (total_bytes / time_elapsed))
|
||||||
LOG.info(log_msg)
|
LOG.info(log_msg)
|
||||||
with open('./oslo_res_%s.txt' % args.topic, 'a+') as f:
|
with open('./oslo_res_%s.txt' % args.server, 'a+') as f:
|
||||||
f.write(log_msg + '\n')
|
f.write(log_msg + '\n')
|
||||||
|
|
||||||
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user