From 84109fe8fc058f5df0aec6972e10547a7e7837ab Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Fri, 4 Mar 2016 13:45:31 +0300 Subject: [PATCH] Simulator: calculate message latency statistics Calculate message latency: for RPC calls on both server-side and on client-side (round-trip); for RPC cast and NOTIFY on server-side only. The message is extended with metadata: sequence id, creation time, reception time and return time (for RPC calls). For every message the metadata is stored into MessageStatCollector. To reduce memory consumption the collector aggregates stats on every second and can print the ongoing stats to stdout. At the finish the overall stats are printed. NOTIFY tests are changed to be similar to RPC: * use pre-generated messages * parameter "-w WAIT_BEFORE_ANSWER" for processing delay * parameter "--requeue" to re-queue new messages (only those that miss the cache) Change-Id: I4c2fc11bfaee17b6825cdc7d6edf07b8e91ef83a --- tools/simulator.py | 468 ++++++++++++++++++++++++++++++--------------- 1 file changed, 311 insertions(+), 157 deletions(-) diff --git a/tools/simulator.py b/tools/simulator.py index 9b8069c71..a764dfe79 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -16,7 +16,7 @@ eventlet.monkey_patch() import argparse import bisect import collections -import datetime +import functools import itertools import logging import os @@ -37,7 +37,7 @@ from oslo_utils import timeutils LOG = logging.getLogger() RANDOM_GENERATOR = None CURRENT_PID = None -RPC_CLIENTS = [] +CLIENTS = [] MESSAGES = [] USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ @@ -94,76 +94,183 @@ class LoggingNoParsingFilter(logging.Filter): return True -class Monitor(object): - def __init__(self, show_stats=False, *args, **kwargs): - self._count = self._prev_count = 0 - self.show_stats = show_stats - if self.show_stats: - self._monitor() - - def _monitor(self): - threading.Timer(1.0, self._monitor).start() - LOG.debug("%d msg was received per second", - (self._count - self._prev_count)) - self._prev_count = self._count - - def info(self, *args, **kwargs): - self._count += 1 +Message = collections.namedtuple( + 'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts']) -class NotifyEndpoint(Monitor): - def __init__(self, *args, **kwargs): - super(NotifyEndpoint, self).__init__(*args, **kwargs) - self.cache = [] +def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0): + return Message(seq, cargo, client_ts, server_ts, return_ts) + + +def update_message(message, **kwargs): + return Message(*message)._replace(**kwargs) + + +class MessageStatsCollector(object): + def __init__(self, label): + self.label = label + self.buffer = [] # buffer to store messages during report interval + self.series = [] # stats for every report interval + threading.Timer(1.0, self.monitor).start() # schedule in a second + + def monitor(self): + threading.Timer(1.0, self.monitor).start() + now = time.time() + + count = len(self.buffer) + + size = 0 + min_latency = sys.maxint + max_latency = 0 + sum_latencies = 0 + + for i in range(count): + p = self.buffer[i] + size += len(p.cargo) + + latency = None + if p.return_ts: + latency = p.return_ts - p.client_ts # round-trip + elif p.server_ts: + latency = p.server_ts - p.client_ts # client -> server + + if latency: + sum_latencies += latency + min_latency = min(min_latency, latency) + max_latency = max(max_latency, latency) + + del self.buffer[:count] # trim processed items + + seq = len(self.series) + stats = dict(seq=seq, timestamp=now, count=count, size=size) + msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' % + (self.label, seq, count, size)) + + if sum_latencies: + latency = sum_latencies / count + stats.update(dict(latency=latency, + min_latency=min_latency, + max_latency=max_latency)) + msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' % + (latency, min_latency, max_latency)) + + self.series.append(stats) + LOG.info(msg) + + def push(self, parsed_message): + self.buffer.append(parsed_message) + + def get_series(self): + return self.series + + @staticmethod + def calc_stats(label, *collectors): + count = 0 + size = 0 + min_latency = sys.maxint + max_latency = 0 + sum_latencies = 0 + start = sys.maxint + end = 0 + + for point in itertools.chain(*(c.get_series() for c in collectors)): + count += point['count'] + size += point['size'] + start = min(start, point['timestamp']) + end = max(end, point['timestamp']) + + if 'latency' in point: + sum_latencies += point['latency'] * point['count'] + min_latency = min(min_latency, point['min_latency']) + max_latency = max(max_latency, point['max_latency']) + + # start is the timestamp of the earliest block, which inclides samples + # for the prior second + start -= 1 + duration = end - start if count else 0 + stats = dict(count=count, size=size, duration=duration, count_p_s=0, + size_p_s=0) + if duration: + stats.update(dict(start=start, end=end, + count_p_s=count / duration, + size_p_s=size / duration)) + + msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) ' + 'bytes: %d (%.0f bps)' % + (label, duration, count, stats['count_p_s'], + size, stats['size_p_s'])) + + if sum_latencies: + latency = sum_latencies / count + stats.update(dict(latency=latency, + min_latency=min_latency, + max_latency=max_latency)) + msg += (' latency: %.3f min: %.3f max: %.3f' % + (latency, min_latency, max_latency)) + + LOG.info(msg) + return stats + + +class NotifyEndpoint(object): + def __init__(self, wait_before_answer, requeue): + self.wait_before_answer = wait_before_answer + self.requeue = requeue + self.received_messages = MessageStatsCollector('server') + self.cache = set() def info(self, ctxt, publisher_id, event_type, payload, metadata): - super(NotifyEndpoint, self).info(ctxt, publisher_id, event_type, - payload, metadata) - LOG.debug('msg rcv') LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload) - if not self.show_stats and payload not in self.cache: - LOG.debug('requeue msg') - self.cache.append(payload) - for i in range(15): - eventlet.sleep(1) + + server_ts = time.time() + + message = update_message(payload, server_ts=server_ts) + self.received_messages.push(message) + + if self.requeue and message.seq not in self.cache: + self.cache.add(message.seq) + + if self.wait_before_answer > 0: + time.sleep(self.wait_before_answer) + return messaging.NotificationResult.REQUEUE - else: - LOG.debug('ack msg') + return messaging.NotificationResult.HANDLED -def notify_server(transport, topic, show_stats, duration): - endpoints = [NotifyEndpoint(show_stats)] +def notify_server(transport, topic, wait_before_answer, duration, requeue): + endpoints = [NotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_notification_listener(transport, [target], endpoints, executor='eventlet') run_server(server, duration=duration) + return endpoints[0] -class BatchNotifyEndpoint(Monitor): - def __init__(self, *args, **kwargs): - super(BatchNotifyEndpoint, self).__init__(*args, **kwargs) - self.cache = [] - def info(self, messages): - super(BatchNotifyEndpoint, self).info(messages) - self._count += len(messages) - 1 +class BatchNotifyEndpoint(object): + def __init__(self, wait_before_answer, requeue): + self.wait_before_answer = wait_before_answer + self.requeue = requeue + self.received_messages = MessageStatsCollector('server') + self.cache = set() + def info(self, batch): LOG.debug('msg rcv') - LOG.debug("%s", messages) - if not self.show_stats and messages not in self.cache: - LOG.debug('requeue msg') - self.cache.append(messages) - for i in range(15): - eventlet.sleep(1) - return messaging.NotificationResult.REQUEUE - else: - LOG.debug('ack msg') + LOG.debug("%s", batch) + + server_ts = time.time() + + for item in batch: + message = update_message(item['payload'], server_ts=server_ts) + self.received_messages.push(message) + return messaging.NotificationResult.HANDLED -def batch_notify_server(transport, topic, show_stats, duration): - endpoints = [BatchNotifyEndpoint(show_stats)] +def batch_notify_server(transport, topic, wait_before_answer, duration, + requeue): + endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_batch_notification_listener( transport, [target], @@ -171,53 +278,83 @@ def batch_notify_server(transport, topic, show_stats, duration): batch_size=1000, batch_timeout=5) run_server(server, duration=duration) + return endpoints[0] -class RpcEndpoint(Monitor): - def __init__(self, wait_before_answer, show_stats): - self.count = None + +class RpcEndpoint(object): + def __init__(self, wait_before_answer): self.wait_before_answer = wait_before_answer - self.messages_received = 0 + self.received_messages = MessageStatsCollector('server') def info(self, ctxt, message): - self.messages_received += 1 - i = int(message.split(' ')[-1]) - if self.count is None: - self.count = i - elif i == 0: - self.count = 0 - else: - self.count += 1 + server_ts = time.time() + + LOG.debug("######## RCV: %s", message) + + reply = update_message(message, server_ts=server_ts) + self.received_messages.push(reply) - LOG.debug("######## RCV: %s/%s", self.count, message) if self.wait_before_answer > 0: time.sleep(self.wait_before_answer) - return "OK: %s" % message + + return reply -class RPCClient(object): - def __init__(self, transport, target, timeout, method, wait_after_msg): - self.client = rpc.RPCClient(transport, target) - self.client = self.client.prepare(timeout=timeout) +class Client(object): + def __init__(self, client_id, client, method, has_result, + wait_after_msg): + self.client_id = client_id + self.client = client self.method = method - self.bytes = 0 - self.msg_sent = 0 + self.wait_after_msg = wait_after_msg + + self.seq = 0 self.messages_count = len(MESSAGES) # Start sending the messages from a random position to avoid # memory re-usage and generate more realistic load on the library # and a message transport self.position = random.randint(0, self.messages_count - 1) - self.wait_after_msg = wait_after_msg + self.sent_messages = MessageStatsCollector('client-%s' % client_id) + + if has_result: + self.round_trip_messages = MessageStatsCollector( + 'round-trip-%s' % client_id) def send_msg(self): - msg = MESSAGES[self.position] - self.method(self.client, msg) - self.bytes += len(msg) - self.msg_sent += 1 + msg = make_message(self.seq, MESSAGES[self.position], time.time()) + self.sent_messages.push(msg) + + res = self.method(self.client, msg) + if res: + return_ts = time.time() + res = update_message(res, return_ts=return_ts) + self.round_trip_messages.push(res) + + self.seq += 1 self.position = (self.position + 1) % self.messages_count if self.wait_after_msg > 0: time.sleep(self.wait_after_msg) +class RPCClient(Client): + def __init__(self, client_id, transport, target, timeout, is_cast, + wait_after_msg): + client = rpc.RPCClient(transport, target).prepare(timeout=timeout) + method = _rpc_cast if is_cast else _rpc_call + + super(RPCClient, self).__init__(client_id, client, method, + not is_cast, wait_after_msg) + + +class NotifyClient(Client): + def __init__(self, client_id, transport, topic, wait_after_msg): + client = notify.Notifier(transport, driver='messaging', topic=topic) + client = client.prepare(publisher_id='publisher-%d' % client_id) + method = _notify + super(NotifyClient, self).__init__(client_id, client, method, + False, wait_after_msg) + + def generate_messages(messages_count): # Limit the messages amount. Clients will reiterate the array again # if an amount of messages to be sent is bigger than MESSAGES_LIMIT @@ -227,57 +364,63 @@ def generate_messages(messages_count): for i in range(messages_count): length = RANDOM_GENERATOR() - msg = ''.join(random.choice(string.lowercase) for x in range(length)) \ - + ' ' + str(i) + msg = ''.join(random.choice(string.lowercase) for x in range(length)) MESSAGES.append(msg) LOG.info("Messages has been prepared") def run_server(server, duration=None): - server.start() - if duration: - with timeutils.StopWatch(duration) as stop_watch: - while not stop_watch.expired(): - time.sleep(1) - server.stop() - server.wait() + try: + server.start() + if duration: + with timeutils.StopWatch(duration) as stop_watch: + while not stop_watch.expired(): + time.sleep(1) + server.stop() + server.wait() + except KeyboardInterrupt: # caught SIGINT + LOG.info('Caught SIGINT, terminating') + time.sleep(1) # wait for stats collector to process the last second -def rpc_server(transport, target, wait_before_answer, executor, show_stats, - duration): - endpoints = [RpcEndpoint(wait_before_answer, show_stats)] +def rpc_server(transport, target, wait_before_answer, executor, duration): + endpoints = [RpcEndpoint(wait_before_answer)] server = rpc.get_rpc_server(transport, target, endpoints, executor=executor) LOG.debug("starting RPC server for target %s", target) + run_server(server, duration=duration) - LOG.info("Received total messages: %d", - server.dispatcher.endpoints[0].messages_received) + + return server.dispatcher.endpoints[0] -def spawn_notify_clients(threads, *args, **kwargs): - p = eventlet.GreenPool(size=threads) - for i in range(0, threads): - p.spawn_n(notifier, i, *args, **kwargs) - p.waitall() - - -def spawn_rpc_clients(threads, transport, targets, - *args, **kwargs): +def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout, + is_cast, messages_count, duration): p = eventlet.GreenPool(size=threads) targets = itertools.cycle(targets) for i in range(0, threads): target = targets.next() LOG.debug("starting RPC client for target %s", target) - p.spawn_n(send_msg, i, transport, target, *args, **kwargs) + client_builder = functools.partial(RPCClient, i, transport, target, + timeout, is_cast, wait_after_msg) + p.spawn_n(send_messages, i, client_builder, messages_count, duration) p.waitall() -def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, - messages_count, duration): - rpc_method = _rpc_cast if is_cast else _rpc_call - client = RPCClient(transport, target, timeout, rpc_method, wait_after_msg) - RPC_CLIENTS.append(client) +def spawn_notify_clients(threads, topic, transport, message_count, + wait_after_msg, timeout, duration): + p = eventlet.GreenPool(size=threads) + for i in range(0, threads): + client_builder = functools.partial(NotifyClient, i, transport, topic, + wait_after_msg) + p.spawn_n(send_messages, i, client_builder, message_count, duration) + p.waitall() + + +def send_messages(client_id, client_builder, messages_count, duration): + client = client_builder() + CLIENTS.append(client) if duration: with timeutils.StopWatch(duration) as stop_watch: @@ -285,11 +428,14 @@ def send_msg(c_id, transport, target, wait_after_msg, timeout, is_cast, client.send_msg() eventlet.sleep() else: - LOG.debug("Sending %d messages using client %d", messages_count, c_id) + LOG.debug("Sending %d messages using client %d", + messages_count, client_id) for _ in six.moves.range(0, messages_count): client.send_msg() eventlet.sleep() - LOG.debug("Client %d has sent %d messages", c_id, messages_count) + LOG.debug("Client %d has sent %d messages", client_id, messages_count) + + time.sleep(1) # wait for replies to be collected def _rpc_call(client, msg): @@ -299,6 +445,7 @@ def _rpc_call(client, msg): LOG.exception('Error %s on CALL for message %s', str(e), msg) else: LOG.debug("SENT: %s, RCV: %s", msg, res) + return res def _rpc_cast(client, msg): @@ -310,29 +457,39 @@ def _rpc_cast(client, msg): LOG.debug("SENT: %s", msg) -def notifier(_id, topic, transport, messages, wait_after_msg, timeout, - duration): - n1 = notify.Notifier(transport, - driver='messaging', - topic=topic).prepare( - publisher_id='publisher-%d' % _id) - payload = dict(msg=0, vm='test', otherdata='ahah') - ctxt = {} +def _notify(notification_client, msg): + notification_client.info({}, 'compute.start', msg) - def send_notif(): - payload['msg'] += 1 - LOG.debug("sending notification %s", payload) - n1.info(ctxt, 'compute.start1', payload) - if wait_after_msg > 0: - time.sleep(wait_after_msg) - if duration: - with timeutils.StopWatch(duration) as stop_watch: - while not stop_watch.expired(): - send_notif() - else: - for i in range(0, messages): - send_notif() +def show_server_stats(endpoint, args): + LOG.info('=' * 35 + ' summary ' + '=' * 35) + output = dict(series={}, summary={}) + output['series']['server'] = endpoint.received_messages.get_series() + stats = MessageStatsCollector.calc_stats( + 'server', endpoint.received_messages) + output['summary'] = stats + + +def show_client_stats(clients, has_reply=False): + LOG.info('=' * 35 + ' summary ' + '=' * 35) + output = dict(series={}, summary={}) + + for cl in clients: + cl_id = cl.client_id + output['series']['client_%s' % cl_id] = cl.sent_messages.get_series() + + if has_reply: + output['series']['round_trip_%s' % cl_id] = ( + cl.round_trip_messages.get_series()) + + sent_stats = MessageStatsCollector.calc_stats( + 'client', *(cl.sent_messages for cl in clients)) + output['summary']['client'] = sent_stats + + if has_reply: + round_trip_stats = MessageStatsCollector.calc_stats( + 'round-trip', *(cl.round_trip_messages for cl in clients)) + output['summary']['round_trip'] = round_trip_stats def _setup_logging(is_debug): @@ -375,11 +532,12 @@ def main(): help='notify/rpc server/client mode') server = subparsers.add_parser('notify-server') - server.add_argument('--show-stats', dest='show_stats', - type=bool, default=True) + server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) + server.add_argument('--requeue', dest='requeue', action='store_true') + server = subparsers.add_parser('batch-notify-server') - server.add_argument('--show-stats', dest='show_stats', - type=bool, default=True) + server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) + server.add_argument('--requeue', dest='requeue', action='store_true') client = subparsers.add_parser('notify-client') client.add_argument('-p', dest='threads', type=int, default=1, @@ -393,8 +551,6 @@ def main(): server = subparsers.add_parser('rpc-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) - server.add_argument('--show-stats', dest='show_stats', - type=bool, default=True) server.add_argument('-e', '--executor', dest='executor', type=str, default='eventlet', help='name of a message executor') @@ -442,45 +598,43 @@ def main(): target = messaging.Target(topic=args.topic, server=args.server) if args.url.startswith('zmq'): cfg.CONF.rpc_zmq_matchmaker = "redis" - rpc_server(transport, target, args.wait_before_answer, args.executor, - args.show_stats, args.duration) + + endpoint = rpc_server(transport, target, args.wait_before_answer, + args.executor, args.duration) + show_server_stats(endpoint, args) + elif args.mode == 'notify-server': - notify_server(transport, args.topic, args.show_stats, args.duration) + endpoint = notify_server(transport, args.topic, + args.wait_before_answer, args.duration, + args.requeue) + show_server_stats(endpoint, args) + elif args.mode == 'batch-notify-server': - batch_notify_server(transport, args.topic, args.show_stats, - args.duration) + endpoint = batch_notify_server(transport, args.topic, + args.wait_before_answer, args.duration, + args.requeue) + show_server_stats(endpoint, args) + elif args.mode == 'notify-client': spawn_notify_clients(args.threads, args.topic, transport, args.messages, args.wait_after_msg, args.timeout, args.duration) + show_client_stats(CLIENTS) + elif args.mode == 'rpc-client': targets = [target.partition('.')[::2] for target in args.targets] - start = datetime.datetime.now() targets = [messaging.Target( topic=topic, server=server_name, fanout=args.is_fanout) for topic, server_name in targets] spawn_rpc_clients(args.threads, transport, targets, args.wait_after_msg, args.timeout, args.is_cast, args.messages, args.duration) - time_elapsed = (datetime.datetime.now() - start).total_seconds() - msg_count = 0 - total_bytes = 0 - for client in RPC_CLIENTS: - msg_count += client.msg_sent - total_bytes += client.bytes + show_client_stats(CLIENTS, not args.is_cast) - LOG.info('%d messages were sent for %d seconds. ' - 'Bandwidth was %d msg/sec', msg_count, time_elapsed, - (msg_count / time_elapsed)) - log_msg = '%s bytes were sent for %d seconds. Bandwidth is %d b/s' % ( - total_bytes, time_elapsed, (total_bytes / time_elapsed)) - LOG.info(log_msg) - with open('./oslo_res_%s.txt' % args.server, 'a+') as f: - f.write(log_msg + '\n') - - LOG.info("calls finished, wait %d seconds", args.exit_wait) - time.sleep(args.exit_wait) + if args.exit_wait: + LOG.info("Finished. waiting for %d seconds", args.exit_wait) + time.sleep(args.exit_wait) if __name__ == '__main__':