diff --git a/tools/simulator.py b/tools/simulator.py index 9b8069c71..a764dfe79 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -16,7 +16,7 @@ eventlet.monkey_patch() import argparse import bisect import collections -import datetime +import functools import itertools import logging import os @@ -37,7 +37,7 @@ from oslo_utils import timeutils LOG = logging.getLogger() RANDOM_GENERATOR = None CURRENT_PID = None -RPC_CLIENTS = [] +CLIENTS = [] MESSAGES = [] USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ @@ -94,76 +94,183 @@ class LoggingNoParsingFilter(logging.Filter): return True -class Monitor(object): - def __init__(self, show_stats=False, *args, **kwargs): - self._count = self._prev_count = 0 - self.show_stats = show_stats - if self.show_stats: - self._monitor() - - def _monitor(self): - threading.Timer(1.0, self._monitor).start() - LOG.debug("%d msg was received per second", - (self._count - self._prev_count)) - self._prev_count = self._count - - def info(self, *args, **kwargs): - self._count += 1 +Message = collections.namedtuple( + 'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts']) -class NotifyEndpoint(Monitor): - def __init__(self, *args, **kwargs): - super(NotifyEndpoint, self).__init__(*args, **kwargs) - self.cache = [] +def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0): + return Message(seq, cargo, client_ts, server_ts, return_ts) + + +def update_message(message, **kwargs): + return Message(*message)._replace(**kwargs) + + +class MessageStatsCollector(object): + def __init__(self, label): + self.label = label + self.buffer = [] # buffer to store messages during report interval + self.series = [] # stats for every report interval + threading.Timer(1.0, self.monitor).start() # schedule in a second + + def monitor(self): + threading.Timer(1.0, self.monitor).start() + now = time.time() + + count = len(self.buffer) + + size = 0 + min_latency = sys.maxint + max_latency = 0 + sum_latencies = 0 + + for i in range(count): + p = self.buffer[i] + size += len(p.cargo) + + latency = None + if p.return_ts: + latency = p.return_ts - p.client_ts # round-trip + elif p.server_ts: + latency = p.server_ts - p.client_ts # client -> server + + if latency: + sum_latencies += latency + min_latency = min(min_latency, latency) + max_latency = max(max_latency, latency) + + del self.buffer[:count] # trim processed items + + seq = len(self.series) + stats = dict(seq=seq, timestamp=now, count=count, size=size) + msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' % + (self.label, seq, count, size)) + + if sum_latencies: + latency = sum_latencies / count + stats.update(dict(latency=latency, + min_latency=min_latency, + max_latency=max_latency)) + msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' % + (latency, min_latency, max_latency)) + + self.series.append(stats) + LOG.info(msg) + + def push(self, parsed_message): + self.buffer.append(parsed_message) + + def get_series(self): + return self.series + + @staticmethod + def calc_stats(label, *collectors): + count = 0 + size = 0 + min_latency = sys.maxint + max_latency = 0 + sum_latencies = 0 + start = sys.maxint + end = 0 + + for point in itertools.chain(*(c.get_series() for c in collectors)): + count += point['count'] + size += point['size'] + start = min(start, point['timestamp']) + end = max(end, point['timestamp']) + + if 'latency' in point: + sum_latencies += point['latency'] * point['count'] + min_latency = min(min_latency, point['min_latency']) + max_latency = max(max_latency, point['max_latency']) + + # start is the timestamp of the earliest block, which inclides samples + # for the prior second + start -= 1 + duration = end - start if count else 0 + stats = dict(count=count, size=size, duration=duration, count_p_s=0, + size_p_s=0) + if duration: + stats.update(dict(start=start, end=end, + count_p_s=count / duration, + size_p_s=size / duration)) + + msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) ' + 'bytes: %d (%.0f bps)' % + (label, duration, count, stats['count_p_s'], + size, stats['size_p_s'])) + + if sum_latencies: + latency = sum_latencies / count + stats.update(dict(latency=latency, + min_latency=min_latency, + max_latency=max_latency)) + msg += (' latency: %.3f min: %.3f max: %.3f' % + (latency, min_latency, max_latency)) + + LOG.info(msg) + return stats + + +class NotifyEndpoint(object): + def __init__(self, wait_before_answer, requeue): + self.wait_before_answer = wait_before_answer + self.requeue = requeue + self.received_messages = MessageStatsCollector('server') + self.cache = set() def info(self, ctxt, publisher_id, event_type, payload, metadata): - super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type, - payload, metadata) - LOG.debug('msg rcv') LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload) - if not self.show_stats and payload not in self.cache: - LOG.debug('requeue msg') - self.cache.append(payload) - for i in range(15): - eventlet.sleep(1) + + server_ts = time.time() + + message = update_message(payload, server_ts=server_ts) + self.received_messages.push(message) + + if self.requeue and message.seq not in self.cache: + self.cache.add(message.seq) + + if self.wait_before_answer > 0: + time.sleep(self.wait_before_answer) + return messaging.NotificationResult.REQUEUE - else: - LOG.debug('ack msg') + return messaging.NotificationResult.HANDLED -def notify_server(transport, topic, show_stats, duration): - endpoints = [NotifyEndpoint(show_stats)] +def notify_server(transport, topic, wait_before_answer, duration, requeue): + endpoints = [NotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_notification_listener(transport, [target], endpoints, executor='eventlet') run_server(server, duration=duration) + return endpoints[0] -class BatchNotifyEndpoint(Monitor): - def __init__(self, *args, **kwargs): - super(BatchNotifyEndpoint, self).__init__(*args, **kwargs) - self.cache = [] - def info(self, messages): - super(BatchNotifyEndpoint, self).info(messages) - self._count += len(messages) - 1 +class BatchNotifyEndpoint(object): + def __init__(self, wait_before_answer, requeue): + self.wait_before_answer = wait_before_answer + self.requeue = requeue + self.received_messages = MessageStatsCollector('server') + self.cache = set() + def info(self, batch): LOG.debug('msg rcv') - LOG.debug("%s", messages) - if not self.show_stats and messages not in self.cache: - LOG.debug('requeue msg') - self.cache.append(messages) - for i in range(15): - eventlet.sleep(1) - return messaging.NotificationResult.REQUEUE - else: - LOG.debug('ack msg') + LOG.debug("%s", batch) + + server_ts = time.time() + + for item in batch: + message = update_message(item['payload'], server_ts=server_ts) + self.received_messages.push(message) + return messaging.NotificationResult.HANDLED -def batch_notify_server(transport, topic, show_stats, duration): - endpoints = [BatchNotifyEndpoint(show_stats)] +def batch_notify_server(transport, topic, wait_before_answer, duration, + requeue): + endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_batch_notification_listener( transport, [target], @@ -171,53 +278,83 @@ def batch_notify_server(transport, topic, show_stats, duration): batch_size=1000, batch_timeout=5) run_server(server, duration=duration) + return endpoints[0] -class RpcEndpoint(Monitor): - def __init__(self, wait_before_answer, show_stats): - self.count = None + +class RpcEndpoint(object): + def __init__(self, wait_before_answer): self.wait_before_answer = wait_before_answer - self.messages_received = 0 + self.received_messages = MessageStatsCollector('server') def info(self, ctxt, message): - self.messages_received += 1 - i = int(message.split(' ')[-1]) - if self.count is None: - self.count = i - elif i == 0: - self.count = 0 - else: - self.count += 1 + server_ts = time.time() + + LOG.debug("######## RCV: %s", message) + + reply = update_message(message, server_ts=server_ts) + self.received_messages.push(reply) - LOG.debug("######## RCV: %s/%s", self.count, message) if self.wait_before_answer > 0: time.sleep(self.wait_before_answer) - return "OK: %s" % message + + return reply -class RPCClient(object): - def __init__(self, transport, target, timeout, method, wait_after_msg): - self.client = rpc.RPCClient(transport, target) - self.client = self.client.prepare(timeout=timeout) +class Client(object): + def __init__(self, client_id, client, method, has_result, + wait_after_msg): + self.client_id = client_id + self.client = client self.method = method - self.bytes = 0 - self.msg_sent = 0 + self.wait_after_msg = wait_after_msg + + self.seq = 0 self.messages_count = len(MESSAGES) # Start sending the messages from a random position to avoid # memory re-usage and generate more realistic load on the library # and a message transport self.position = random.randint(0, self.messages_count - 1) - self.wait_after_msg = wait_after_msg + self.sent_messages = MessageStatsCollector('client-%s' % client_id) + + if has_result: + self.round_trip_messages = MessageStatsCollector( + 'round-trip-%s' % client_id) def send_msg(self): - msg = MESSAGES[self.position] - self.method(self.client, msg) - self.bytes += len(msg) - self.msg_sent += 1 + msg = make_message(self.seq, MESSAGES[self.position], time.time()) + self.sent_messages.push(msg) + + res = self.method(self.client, msg) + if res: + return_ts = time.time() + res = update_message(res, return_ts=return_ts) + self.round_trip_messages.push(res) + + self.seq += 1 self.position = (self.position + 1) % self.messages_count if self.wait_after_msg > 0: time.sleep(self.wait_after_msg) +class RPCClient(Client): + def __init__(self, client_id, transport, target, timeout, is_cast, + wait_after_msg): + client = rpc.RPCClient(transport, target).prepare(timeout=timeout) + method = _rpc_cast if is_cast else _rpc_call + + super(RPCClient, self).__init__(client_id, client, method, + not is_cast, wait_after_msg) + + +class NotifyClient(Client): + def __init__(self, client_id, transport, topic, wait_after_msg): + client = notify.Notifier(transport, driver='messaging', topic=topic) + client = client.prepare(publisher_id='publisher-%d' % client_id) + method = _notify + super(NotifyClient, self).__init__(client_id, client, method, + False, wait_after_msg) + + def generate_messages(messages_count): # Limit the messages amount. Clients will reiterate the array again # if an amount of messages to be sent is bigger than MESSAGES_LIMIT @@ -227,57 +364,63 @@ def generate_messages(messages_count): for i in range(messages_count): length = RANDOM_GENERATOR() - msg = ''.join(random.choice(string.lowercase) for x in range(length)) \ - + ' ' + str(i) + msg = ''.join(random.choice(string.lowercase) for x in range(length)) MESSAGES.append(msg) LOG.info("Messages has been prepared") def run_server(server, duration=None): - server.start() - if duration: - with timeutils.StopWatch(duration) as stop_watch: - while not stop_watch.expired(): - time.sleep(1) - server.stop() - server.wait() + try: + server.start() + if duration: + with timeutils.StopWatch(duration) as stop_watch: + while not stop_watch.expired(): + time.sleep(1) + server.stop() + server.wait() + except KeyboardInterrupt: # caught SIGINT + LOG.info('Caught SIGINT, terminating') + time.sleep(1) # wait for stats collector to process the last second -def rpc_server(transport, target, wait_before_answer, executor, show_stats, - duration): - endpoints = [RpcEndpoint(wait_before_answer, show_stats)] +def rpc_server(transport, target, wait_before_answer, executor, duration): + endpoints = [RpcEndpoint(wait_before_answer)] server = rpc.get_rpc_server(transport, target, endpoints, executor=executor) LOG.debug("starting RPC server for target %s", target) + run_server(server, duration=duration) - LOG.info("Received total messages: %d", - server.dispatcher.endpoints[0].messages_received) + + return server.dispatcher.endpoints[0] -def spawn_notify_clients(threads, *args, **kwargs): - p = eventlet.GreenPool(size=threads) - for i in range(0, threads): - p.spawn_n(notifier, i, *args, **kwargs) - p.waitall() - - -def spawn_rpc_clients(threads, transport, targets, - *args, **kwargs): +def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout, + is_cast, messages_count, duration): p = eventlet.GreenPool(size=threads) targets = itertools.cycle(targets) for i in range(0, threads): target = targets.next() LOG.debug("starting RPC client for target %s", target) - p.spawn_n(send_msg, i, transport, target, *args, **kwargs) + client_builder = functools.partial(RPCClient, i, transport, target, + timeout, is_cast, wait_after_msg) + p.spawn_n(send_messages, i, client_builder, messages_count, duration) p.waitall() -def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, - messages_count, duration): - rpc_method = _rpc_cast if is_cast else _rpc_call - client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg) - RPC_CLIENTS.append(client) +def spawn_notify_clients(threads, topic, transport, message_count, + wait_after_msg, timeout, duration): + p = eventlet.GreenPool(size=threads) + for i in range(0, threads): + client_builder = functools.partial(NotifyClient, i, transport, topic, + wait_after_msg) + p.spawn_n(send_messages, i, client_builder, message_count, duration) + p.waitall() + + +def send_messages(client_id, client_builder, messages_count, duration): + client = client_builder() + CLIENTS.append(client) if duration: with timeutils.StopWatch(duration) as stop_watch: @@ -285,11 +428,14 @@ def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, client.send_msg() eventlet.sleep() else: - LOG.debug("Sending %d messages using client %d", messages_count, c_id) + LOG.debug("Sending %d messages using client %d", + messages_count, client_id) for _ in six.moves.range(0, messages_count): client.send_msg() eventlet.sleep() - LOG.debug("Client %d has sent %d messages", c_id, messages_count) + LOG.debug("Client %d has sent %d messages", client_id, messages_count) + + time.sleep(1) # wait for replies to be collected def _rpc_call(client, msg): @@ -299,6 +445,7 @@ def _rpc_call(client, msg): LOG.exception('Error %s on CALL for message %s', str(e), msg) else: LOG.debug("SENT: %s, RCV: %s", msg, res) + return res def _rpc_cast(client, msg): @@ -310,29 +457,39 @@ def _rpc_cast(client, msg): LOG.debug("SENT: %s", msg) -def notifier(_id, topic, transport, messages, wait_after_msg, timeout, - duration): - n1 = notify.Notifier(transport, - driver='messaging', - topic=topic).prepare( - publisher_id='publisher-%d' % _id) - payload = dict(msg=0, vm='test', otherdata='ahah') - ctxt = {} +def _notify(notification_client, msg): + notification_client.info({}, 'compute.start', msg) - def send_notif(): - payload['msg'] += 1 - LOG.debug("sending notification %s", payload) - n1.info(ctxt, 'compute.start1', payload) - if wait_after_msg > 0: - time.sleep(wait_after_msg) - if duration: - with timeutils.StopWatch(duration) as stop_watch: - while not stop_watch.expired(): - send_notif() - else: - for i in range(0, messages): - send_notif() +def show_server_stats(endpoint, args): + LOG.info('=' * 35 + ' summary ' + '=' * 35) + output = dict(series={}, summary={}) + output['series']['server'] = endpoint.received_messages.get_series() + stats = MessageStatsCollector.calc_stats( + 'server', endpoint.received_messages) + output['summary'] = stats + + +def show_client_stats(clients, has_reply=False): + LOG.info('=' * 35 + ' summary ' + '=' * 35) + output = dict(series={}, summary={}) + + for cl in clients: + cl_id = cl.client_id + output['series']['client_%s' % cl_id] = cl.sent_messages.get_series() + + if has_reply: + output['series']['round_trip_%s' % cl_id] = ( + cl.round_trip_messages.get_series()) + + sent_stats = MessageStatsCollector.calc_stats( + 'client', *(cl.sent_messages for cl in clients)) + output['summary']['client'] = sent_stats + + if has_reply: + round_trip_stats = MessageStatsCollector.calc_stats( + 'round-trip', *(cl.round_trip_messages for cl in clients)) + output['summary']['round_trip'] = round_trip_stats def _setup_logging(is_debug): @@ -375,11 +532,12 @@ def main(): help='notify/rpc server/client mode') server = subparsers.add_parser('notify-server') - server.add_argument('--show-stats', dest='show_stats', - type=bool, default=True) + server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) + server.add_argument('--requeue', dest='requeue', action='store_true') + server = subparsers.add_parser('batch-notify-server') - server.add_argument('--show-stats', dest='show_stats', - type=bool, default=True) + server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) + server.add_argument('--requeue', dest='requeue', action='store_true') client = subparsers.add_parser('notify-client') client.add_argument('-p', dest='threads', type=int, default=1, @@ -393,8 +551,6 @@ def main(): server = subparsers.add_parser('rpc-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) - server.add_argument('--show-stats', dest='show_stats', - type=bool, default=True) server.add_argument('-e', '--executor', dest='executor', type=str, default='eventlet', help='name of a message executor') @@ -442,45 +598,43 @@ def main(): target = messaging.Target(topic=args.topic, server=args.server) if args.url.startswith('zmq'): cfg.CONF.rpc_zmq_matchmaker = "redis" - rpc_server(transport, target, args.wait_before_answer, args.executor, - args.show_stats, args.duration) + + endpoint = rpc_server(transport, target, args.wait_before_answer, + args.executor, args.duration) + show_server_stats(endpoint, args) + elif args.mode == 'notify-server': - notify_server(transport, args.topic, args.show_stats, args.duration) + endpoint = notify_server(transport, args.topic, + args.wait_before_answer, args.duration, + args.requeue) + show_server_stats(endpoint, args) + elif args.mode == 'batch-notify-server': - batch_notify_server(transport, args.topic, args.show_stats, - args.duration) + endpoint = batch_notify_server(transport, args.topic, + args.wait_before_answer, args.duration, + args.requeue) + show_server_stats(endpoint, args) + elif args.mode == 'notify-client': spawn_notify_clients(args.threads, args.topic, transport, args.messages, args.wait_after_msg, args.timeout, args.duration) + show_client_stats(CLIENTS) + elif args.mode == 'rpc-client': targets = [target.partition('.')[::2] for target in args.targets] - start = datetime.datetime.now() targets = [messaging.Target( topic=topic, server=server_name, fanout=args.is_fanout) for topic, server_name in targets] spawn_rpc_clients(args.threads, transport, targets, args.wait_after_msg, args.timeout, args.is_cast, args.messages, args.duration) - time_elapsed = (datetime.datetime.now() - start).total_seconds() - msg_count = 0 - total_bytes = 0 - for client in RPC_CLIENTS: - msg_count += client.msg_sent - total_bytes += client.bytes + show_client_stats(CLIENTS, not args.is_cast) - LOG.info('%d messages were sent for %d seconds. ' - 'Bandwidth was %d msg/sec', msg_count, time_elapsed, - (msg_count / time_elapsed)) - log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % ( - total_bytes, time_elapsed, (total_bytes / time_elapsed)) - LOG.info(log_msg) - with open('./oslo_res_%s.txt' % args.server, 'a+') as f: - f.write(log_msg + '\n') - - LOG.info("calls finished, wait %d seconds", args.exit_wait) - time.sleep(args.exit_wait) + if args.exit_wait: + LOG.info("Finished. waiting for %d seconds", args.exit_wait) + time.sleep(args.exit_wait) if __name__ == '__main__':