Optimize simulator.py for better throughput
* Avoid string concatenation during logging, especially when logging level is disabled. * Initialize the Randomized strings we will be using as payloads *before* we start sending the messages as the string creation takes time and reduces throughput Change-Id: I546229fe7ade95572e11bcda95a587228d84bc28
This commit is contained in:
parent
e8703dc802
commit
2251966c99
@ -35,6 +35,8 @@ from oslo_messaging import rpc # noqa
|
|||||||
LOG = logging.getLogger()
|
LOG = logging.getLogger()
|
||||||
RANDOM_VARIABLE = None
|
RANDOM_VARIABLE = None
|
||||||
CURRENT_PID = None
|
CURRENT_PID = None
|
||||||
|
RPC_CLIENTS = []
|
||||||
|
MESSAGES = []
|
||||||
|
|
||||||
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
|
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
|
||||||
{notify-server,notify-client,rpc-server,rpc-client} ...
|
{notify-server,notify-client,rpc-server,rpc-client} ...
|
||||||
@ -88,8 +90,8 @@ class Monitor(object):
|
|||||||
|
|
||||||
def _monitor(self):
|
def _monitor(self):
|
||||||
threading.Timer(1.0, self._monitor).start()
|
threading.Timer(1.0, self._monitor).start()
|
||||||
print ("%d msg was received per second"
|
LOG.debug("%d msg was received per second",
|
||||||
% (self._count - self._prev_count))
|
(self._count - self._prev_count))
|
||||||
self._prev_count = self._count
|
self._prev_count = self._count
|
||||||
|
|
||||||
def info(self, *args, **kwargs):
|
def info(self, *args, **kwargs):
|
||||||
@ -104,16 +106,16 @@ class NotifyEndpoint(Monitor):
|
|||||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type,
|
super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type,
|
||||||
payload, metadata)
|
payload, metadata)
|
||||||
LOG.info('msg rcv')
|
LOG.debug('msg rcv')
|
||||||
LOG.info("%s %s %s %s" % (ctxt, publisher_id, event_type, payload))
|
LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload)
|
||||||
if not self.show_stats and payload not in self.cache:
|
if not self.show_stats and payload not in self.cache:
|
||||||
LOG.info('requeue msg')
|
LOG.debug('requeue msg')
|
||||||
self.cache.append(payload)
|
self.cache.append(payload)
|
||||||
for i in range(15):
|
for i in range(15):
|
||||||
eventlet.sleep(1)
|
eventlet.sleep(1)
|
||||||
return messaging.NotificationResult.REQUEUE
|
return messaging.NotificationResult.REQUEUE
|
||||||
else:
|
else:
|
||||||
LOG.info('ack msg')
|
LOG.debug('ack msg')
|
||||||
return messaging.NotificationResult.HANDLED
|
return messaging.NotificationResult.HANDLED
|
||||||
|
|
||||||
|
|
||||||
@ -135,16 +137,16 @@ class BatchNotifyEndpoint(Monitor):
|
|||||||
super(BatchNotifyEndpoint, self).info(messages)
|
super(BatchNotifyEndpoint, self).info(messages)
|
||||||
self._count += len(messages) - 1
|
self._count += len(messages) - 1
|
||||||
|
|
||||||
LOG.info('msg rcv')
|
LOG.debug('msg rcv')
|
||||||
LOG.info("%s" % messages)
|
LOG.debug("%s", messages)
|
||||||
if not self.show_stats and messages not in self.cache:
|
if not self.show_stats and messages not in self.cache:
|
||||||
LOG.info('requeue msg')
|
LOG.debug('requeue msg')
|
||||||
self.cache.append(messages)
|
self.cache.append(messages)
|
||||||
for i in range(15):
|
for i in range(15):
|
||||||
eventlet.sleep(1)
|
eventlet.sleep(1)
|
||||||
return messaging.NotificationResult.REQUEUE
|
return messaging.NotificationResult.REQUEUE
|
||||||
else:
|
else:
|
||||||
LOG.info('ack msg')
|
LOG.debug('ack msg')
|
||||||
return messaging.NotificationResult.HANDLED
|
return messaging.NotificationResult.HANDLED
|
||||||
|
|
||||||
|
|
||||||
@ -173,12 +175,53 @@ class RpcEndpoint(Monitor):
|
|||||||
else:
|
else:
|
||||||
self.count += 1
|
self.count += 1
|
||||||
|
|
||||||
LOG.info("######## RCV: %s/%s" % (self.count, message))
|
LOG.debug("######## RCV: %s/%s", self.count, message)
|
||||||
if self.wait_before_answer > 0:
|
if self.wait_before_answer > 0:
|
||||||
time.sleep(self.wait_before_answer)
|
time.sleep(self.wait_before_answer)
|
||||||
return "OK: %s" % message
|
return "OK: %s" % message
|
||||||
|
|
||||||
|
|
||||||
|
class RPCClient(object):
|
||||||
|
def __init__(self, transport, target, timeout, method, wait_after_msg):
|
||||||
|
self.client = rpc.RPCClient(transport, target)
|
||||||
|
self.client.prepare(timeout=timeout)
|
||||||
|
self.method = method
|
||||||
|
self.bytes = 0
|
||||||
|
self.msg_sent = 0
|
||||||
|
self.messages_count = len(MESSAGES)
|
||||||
|
# Start sending the messages from a random position to avoid
|
||||||
|
# memory re-usage and generate more realistic load on the library
|
||||||
|
# and a message transport
|
||||||
|
self.position = random.randint(0, self.messages_count - 1)
|
||||||
|
self.wait_after_msg = wait_after_msg
|
||||||
|
|
||||||
|
def send_msg(self):
|
||||||
|
msg = MESSAGES[self.position]
|
||||||
|
self.method(self.client, msg)
|
||||||
|
self.bytes += len(msg)
|
||||||
|
self.msg_sent += 1
|
||||||
|
self.position = (self.position + 1) % self.messages_count
|
||||||
|
if self.wait_after_msg > 0:
|
||||||
|
time.sleep(self.wait_after_msg)
|
||||||
|
|
||||||
|
|
||||||
|
def init_msg(messages_count):
|
||||||
|
# Limit the messages amount. Clients will reiterate the array again
|
||||||
|
# if an amount of messages to be sent is bigger than 1000
|
||||||
|
if messages_count > 1000:
|
||||||
|
messages_count = 1000
|
||||||
|
LOG.info("Preparing %d messages", messages_count)
|
||||||
|
ranges = RANDOM_VARIABLE.rvs(size=messages_count)
|
||||||
|
i = 0
|
||||||
|
for range_start in ranges:
|
||||||
|
length = random.randint(range_start, range_start + 497)
|
||||||
|
msg = ''.join(random.choice(string.lowercase) for x in range(length)) \
|
||||||
|
+ ' ' + str(i)
|
||||||
|
MESSAGES.append(msg)
|
||||||
|
i += 1
|
||||||
|
LOG.info("Messages has been prepared")
|
||||||
|
|
||||||
|
|
||||||
def rpc_server(transport, target, wait_before_answer, executor, show_stats):
|
def rpc_server(transport, target, wait_before_answer, executor, show_stats):
|
||||||
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
|
endpoints = [RpcEndpoint(wait_before_answer, show_stats)]
|
||||||
server = rpc.get_rpc_server(transport, target, endpoints,
|
server = rpc.get_rpc_server(transport, target, endpoints,
|
||||||
@ -194,46 +237,33 @@ def threads_spawner(threads, method, *args, **kwargs):
|
|||||||
p.waitall()
|
p.waitall()
|
||||||
|
|
||||||
|
|
||||||
def send_msg(_id, transport, target, messages, wait_after_msg, timeout,
|
def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast,
|
||||||
is_cast):
|
messages_count):
|
||||||
client = rpc.RPCClient(transport, target)
|
LOG.debug("Sending %d messages using client %d", messages_count, c_id)
|
||||||
client = client.prepare(timeout=timeout)
|
|
||||||
rpc_method = _rpc_cast if is_cast else _rpc_call
|
rpc_method = _rpc_cast if is_cast else _rpc_call
|
||||||
|
client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg)
|
||||||
ranges = RANDOM_VARIABLE.rvs(size=messages)
|
RPC_CLIENTS.append(client)
|
||||||
i = 0
|
for _ in xrange(0, messages_count):
|
||||||
for range_start in ranges:
|
client.send_msg()
|
||||||
length = random.randint(range_start, range_start + 497)
|
LOG.debug("Client %d has sent all messages", c_id)
|
||||||
msg = ''.join(random.choice(string.lowercase) for x in range(length)) \
|
|
||||||
+ ' ' + str(i)
|
|
||||||
i += 1
|
|
||||||
# temporary file to log approximate bytes size of messages
|
|
||||||
with open('./oslo_%s_%s.log' % (target.topic, CURRENT_PID), 'a+') as f:
|
|
||||||
# 37 additional bytes for Python String object size canculation.
|
|
||||||
# In fact we may ignore these bytes, and estimate the data flow
|
|
||||||
# via number of symbols
|
|
||||||
f.write(str(length + 37) + '\n')
|
|
||||||
rpc_method(client, msg)
|
|
||||||
if wait_after_msg > 0:
|
|
||||||
time.sleep(wait_after_msg)
|
|
||||||
|
|
||||||
|
|
||||||
def _rpc_call(client, msg):
|
def _rpc_call(client, msg):
|
||||||
try:
|
try:
|
||||||
res = client.call({}, 'info', message=msg)
|
res = client.call({}, 'info', message=msg)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception('Error %s on CALL for message %s' % (str(e), msg))
|
LOG.exception('Error %s on CALL for message %s', str(e), msg)
|
||||||
else:
|
else:
|
||||||
LOG.info("SENT: %s, RCV: %s" % (msg, res))
|
LOG.debug("SENT: %s, RCV: %s", msg, res)
|
||||||
|
|
||||||
|
|
||||||
def _rpc_cast(client, msg):
|
def _rpc_cast(client, msg):
|
||||||
try:
|
try:
|
||||||
client.cast({}, 'info', message=msg)
|
client.cast({}, 'info', message=msg)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception('Error %s on CAST for message %s' % (str(e), msg))
|
LOG.exception('Error %s on CAST for message %s', str(e), msg)
|
||||||
else:
|
else:
|
||||||
LOG.info("SENT: %s" % msg)
|
LOG.debug("SENT: %s", msg)
|
||||||
|
|
||||||
|
|
||||||
def notifier(_id, transport, messages, wait_after_msg, timeout):
|
def notifier(_id, transport, messages, wait_after_msg, timeout):
|
||||||
@ -244,15 +274,15 @@ def notifier(_id, transport, messages, wait_after_msg, timeout):
|
|||||||
msg = 1 + msg
|
msg = 1 + msg
|
||||||
ctxt = {}
|
ctxt = {}
|
||||||
payload = dict(msg=msg, vm='test', otherdata='ahah')
|
payload = dict(msg=msg, vm='test', otherdata='ahah')
|
||||||
LOG.info("send msg")
|
LOG.debug("send msg")
|
||||||
LOG.info(payload)
|
LOG.debug(payload)
|
||||||
n1.info(ctxt, 'compute.start1', payload)
|
n1.info(ctxt, 'compute.start1', payload)
|
||||||
if wait_after_msg > 0:
|
if wait_after_msg > 0:
|
||||||
time.sleep(wait_after_msg)
|
time.sleep(wait_after_msg)
|
||||||
|
|
||||||
|
|
||||||
def _setup_logging(is_debug):
|
def _setup_logging(is_debug):
|
||||||
log_level = logging.DEBUG if is_debug else logging.WARN
|
log_level = logging.DEBUG if is_debug else logging.INFO
|
||||||
logging.basicConfig(stream=sys.stdout, level=log_level)
|
logging.basicConfig(stream=sys.stdout, level=log_level)
|
||||||
logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
|
logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter())
|
||||||
for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
|
for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging'
|
||||||
@ -348,29 +378,28 @@ def main():
|
|||||||
threads_spawner(args.threads, notifier, transport, args.messages,
|
threads_spawner(args.threads, notifier, transport, args.messages,
|
||||||
args.wait_after_msg, args.timeout)
|
args.wait_after_msg, args.timeout)
|
||||||
elif args.mode == 'rpc-client':
|
elif args.mode == 'rpc-client':
|
||||||
|
init_msg(args.messages)
|
||||||
|
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
threads_spawner(args.threads, send_msg, transport, target,
|
threads_spawner(args.threads, send_msg, transport, target,
|
||||||
args.messages, args.wait_after_msg, args.timeout,
|
args.wait_after_msg, args.timeout, args.is_cast,
|
||||||
args.is_cast)
|
args.messages)
|
||||||
time_ellapsed = (datetime.datetime.now() - start).total_seconds()
|
time_elapsed = (datetime.datetime.now() - start).total_seconds()
|
||||||
msg_count = args.messages * args.threads
|
|
||||||
log_msg = '%d messages was sent for %s seconds. ' \
|
|
||||||
'Bandwidth is %s msg/sec' % (msg_count, time_ellapsed,
|
|
||||||
(msg_count / time_ellapsed))
|
|
||||||
print (log_msg)
|
|
||||||
with open('./oslo_res_%s.txt' % args.topic, 'a+') as f:
|
|
||||||
f.write(log_msg + '\n')
|
|
||||||
|
|
||||||
with open('./oslo_%s_%s.log' % (args.topic, CURRENT_PID), 'a+') as f:
|
msg_count = 0
|
||||||
data = f.read()
|
total_bytes = 0
|
||||||
data = [int(i) for i in data.split()]
|
for client in RPC_CLIENTS:
|
||||||
data_sum = sum(data)
|
msg_count += client.msg_sent
|
||||||
log_msg = '%s bytes were sent for %s seconds. Bandwidth is %s b/s' % (
|
total_bytes += client.bytes
|
||||||
data_sum, time_ellapsed, (data_sum / time_ellapsed))
|
|
||||||
print(log_msg)
|
LOG.info('%d messages were sent for %d seconds. '
|
||||||
|
'Bandwidth was %d msg/sec', msg_count, time_elapsed,
|
||||||
|
(msg_count / time_elapsed))
|
||||||
|
log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % (
|
||||||
|
total_bytes, time_elapsed, (total_bytes / time_elapsed))
|
||||||
|
LOG.info(log_msg)
|
||||||
with open('./oslo_res_%s.txt' % args.topic, 'a+') as f:
|
with open('./oslo_res_%s.txt' % args.topic, 'a+') as f:
|
||||||
f.write(log_msg + '\n')
|
f.write(log_msg + '\n')
|
||||||
os.remove('./oslo_%s_%s.log' % (args.topic, CURRENT_PID))
|
|
||||||
|
|
||||||
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
LOG.info("calls finished, wait %d seconds" % args.exit_wait)
|
||||||
time.sleep(args.exit_wait)
|
time.sleep(args.exit_wait)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user