# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import eventlet eventlet.monkey_patch() import argparse import bisect import collections import functools import itertools import logging import os import random import six import string import sys import threading import time import yaml from oslo_config import cfg import oslo_messaging as messaging from oslo_messaging import notify # noqa from oslo_messaging import rpc # noqa from oslo_utils import timeutils LOG = logging.getLogger() RANDOM_GENERATOR = None CURRENT_PID = None CLIENTS = [] MESSAGES = [] USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\ {notify-server,notify-client,rpc-server,rpc-client} ... Usage example: python tools/simulator.py\ --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-server python tools/simulator.py\ --url rabbit://stackrabbit:secretrabbit@localhost/ rpc-client\ --exit-wait 15000 -p 64 -m 64""" MESSAGES_LIMIT = 1000 DISTRIBUTION_BUCKET_SIZE = 500 def init_random_generator(): data = [] with open('./messages_length.yaml') as m_file: content = yaml.safe_load(m_file) data += [int(n) for n in content[ 'test_data']['string_lengths'].split(', ')] ranges = collections.defaultdict(int) for msg_length in data: range_start = ((msg_length / DISTRIBUTION_BUCKET_SIZE) * DISTRIBUTION_BUCKET_SIZE + 1) ranges[range_start] += 1 ranges_start = sorted(ranges.keys()) total_count = len(data) accumulated_distribution = [] running_total = 0 for range_start in ranges_start: norm = float(ranges[range_start]) / total_count running_total += norm accumulated_distribution.append(running_total) def weighted_random_choice(): r = random.random() * running_total start = ranges_start[bisect.bisect_right(accumulated_distribution, r)] return random.randrange(start, start + DISTRIBUTION_BUCKET_SIZE) return weighted_random_choice class LoggingNoParsingFilter(logging.Filter): def filter(self, record): msg = record.getMessage() for i in ['received {', 'MSG_ID is ']: if i in msg: return False return True Message = collections.namedtuple( 'Message', ['seq', 'cargo', 'client_ts', 'server_ts', 'return_ts']) def make_message(seq, cargo, client_ts=0, server_ts=0, return_ts=0): return Message(seq, cargo, client_ts, server_ts, return_ts) def update_message(message, **kwargs): return Message(*message)._replace(**kwargs) class MessageStatsCollector(object): def __init__(self, label): self.label = label self.buffer = [] # buffer to store messages during report interval self.series = [] # stats for every report interval threading.Timer(1.0, self.monitor).start() # schedule in a second def monitor(self): threading.Timer(1.0, self.monitor).start() now = time.time() count = len(self.buffer) size = 0 min_latency = sys.maxint max_latency = 0 sum_latencies = 0 for i in range(count): p = self.buffer[i] size += len(p.cargo) latency = None if p.return_ts: latency = p.return_ts - p.client_ts # round-trip elif p.server_ts: latency = p.server_ts - p.client_ts # client -> server if latency: sum_latencies += latency min_latency = min(min_latency, latency) max_latency = max(max_latency, latency) del self.buffer[:count] # trim processed items seq = len(self.series) stats = dict(seq=seq, timestamp=now, count=count, size=size) msg = ('%-14s: seq: %-4d count: %-6d bytes: %-10d' % (self.label, seq, count, size)) if sum_latencies: latency = sum_latencies / count stats.update(dict(latency=latency, min_latency=min_latency, max_latency=max_latency)) msg += (' latency: %-9.3f min: %-9.3f max: %-9.3f' % (latency, min_latency, max_latency)) self.series.append(stats) LOG.info(msg) def push(self, parsed_message): self.buffer.append(parsed_message) def get_series(self): return self.series @staticmethod def calc_stats(label, *collectors): count = 0 size = 0 min_latency = sys.maxint max_latency = 0 sum_latencies = 0 start = sys.maxint end = 0 for point in itertools.chain(*(c.get_series() for c in collectors)): count += point['count'] size += point['size'] start = min(start, point['timestamp']) end = max(end, point['timestamp']) if 'latency' in point: sum_latencies += point['latency'] * point['count'] min_latency = min(min_latency, point['min_latency']) max_latency = max(max_latency, point['max_latency']) # start is the timestamp of the earliest block, which inclides samples # for the prior second start -= 1 duration = end - start if count else 0 stats = dict(count=count, size=size, duration=duration, count_p_s=0, size_p_s=0) if duration: stats.update(dict(start=start, end=end, count_p_s=count / duration, size_p_s=size / duration)) msg = ('%s: duration: %.2f count: %d (%.1f msg/sec) ' 'bytes: %d (%.0f bps)' % (label, duration, count, stats['count_p_s'], size, stats['size_p_s'])) if sum_latencies: latency = sum_latencies / count stats.update(dict(latency=latency, min_latency=min_latency, max_latency=max_latency)) msg += (' latency: %.3f min: %.3f max: %.3f' % (latency, min_latency, max_latency)) LOG.info(msg) return stats class NotifyEndpoint(object): def __init__(self, wait_before_answer, requeue): self.wait_before_answer = wait_before_answer self.requeue = requeue self.received_messages = MessageStatsCollector('server') self.cache = set() def info(self, ctxt, publisher_id, event_type, payload, metadata): LOG.debug("%s %s %s %s", ctxt, publisher_id, event_type, payload) server_ts = time.time() message = update_message(payload, server_ts=server_ts) self.received_messages.push(message) if self.requeue and message.seq not in self.cache: self.cache.add(message.seq) if self.wait_before_answer > 0: time.sleep(self.wait_before_answer) return messaging.NotificationResult.REQUEUE return messaging.NotificationResult.HANDLED def notify_server(transport, topic, wait_before_answer, duration, requeue): endpoints = [NotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_notification_listener(transport, [target], endpoints, executor='eventlet') run_server(server, duration=duration) return endpoints[0] class BatchNotifyEndpoint(object): def __init__(self, wait_before_answer, requeue): self.wait_before_answer = wait_before_answer self.requeue = requeue self.received_messages = MessageStatsCollector('server') self.cache = set() def info(self, batch): LOG.debug('msg rcv') LOG.debug("%s", batch) server_ts = time.time() for item in batch: message = update_message(item['payload'], server_ts=server_ts) self.received_messages.push(message) return messaging.NotificationResult.HANDLED def batch_notify_server(transport, topic, wait_before_answer, duration, requeue): endpoints = [BatchNotifyEndpoint(wait_before_answer, requeue)] target = messaging.Target(topic=topic) server = notify.get_batch_notification_listener( transport, [target], endpoints, executor='eventlet', batch_size=1000, batch_timeout=5) run_server(server, duration=duration) return endpoints[0] class RpcEndpoint(object): def __init__(self, wait_before_answer): self.wait_before_answer = wait_before_answer self.received_messages = MessageStatsCollector('server') def info(self, ctxt, message): server_ts = time.time() LOG.debug("######## RCV: %s", message) reply = update_message(message, server_ts=server_ts) self.received_messages.push(reply) if self.wait_before_answer > 0: time.sleep(self.wait_before_answer) return reply class Client(object): def __init__(self, client_id, client, method, has_result, wait_after_msg): self.client_id = client_id self.client = client self.method = method self.wait_after_msg = wait_after_msg self.seq = 0 self.messages_count = len(MESSAGES) # Start sending the messages from a random position to avoid # memory re-usage and generate more realistic load on the library # and a message transport self.position = random.randint(0, self.messages_count - 1) self.sent_messages = MessageStatsCollector('client-%s' % client_id) if has_result: self.round_trip_messages = MessageStatsCollector( 'round-trip-%s' % client_id) def send_msg(self): msg = make_message(self.seq, MESSAGES[self.position], time.time()) self.sent_messages.push(msg) res = self.method(self.client, msg) if res: return_ts = time.time() res = update_message(res, return_ts=return_ts) self.round_trip_messages.push(res) self.seq += 1 self.position = (self.position + 1) % self.messages_count if self.wait_after_msg > 0: time.sleep(self.wait_after_msg) class RPCClient(Client): def __init__(self, client_id, transport, target, timeout, is_cast, wait_after_msg): client = rpc.RPCClient(transport, target).prepare(timeout=timeout) method = _rpc_cast if is_cast else _rpc_call super(RPCClient, self).__init__(client_id, client, method, not is_cast, wait_after_msg) class NotifyClient(Client): def __init__(self, client_id, transport, topic, wait_after_msg): client = notify.Notifier(transport, driver='messaging', topic=topic) client = client.prepare(publisher_id='publisher-%d' % client_id) method = _notify super(NotifyClient, self).__init__(client_id, client, method, False, wait_after_msg) def generate_messages(messages_count): # Limit the messages amount. Clients will reiterate the array again # if an amount of messages to be sent is bigger than MESSAGES_LIMIT if messages_count > MESSAGES_LIMIT: messages_count = MESSAGES_LIMIT LOG.info("Generating %d random messages", messages_count) for i in range(messages_count): length = RANDOM_GENERATOR() msg = ''.join(random.choice(string.lowercase) for x in range(length)) MESSAGES.append(msg) LOG.info("Messages has been prepared") def run_server(server, duration=None): try: server.start() if duration: with timeutils.StopWatch(duration) as stop_watch: while not stop_watch.expired(): time.sleep(1) server.stop() server.wait() except KeyboardInterrupt: # caught SIGINT LOG.info('Caught SIGINT, terminating') time.sleep(1) # wait for stats collector to process the last second def rpc_server(transport, target, wait_before_answer, executor, duration): endpoints = [RpcEndpoint(wait_before_answer)] server = rpc.get_rpc_server(transport, target, endpoints, executor=executor) LOG.debug("starting RPC server for target %s", target) run_server(server, duration=duration) return server.dispatcher.endpoints[0] def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout, is_cast, messages_count, duration): p = eventlet.GreenPool(size=threads) targets = itertools.cycle(targets) for i in range(0, threads): target = targets.next() LOG.debug("starting RPC client for target %s", target) client_builder = functools.partial(RPCClient, i, transport, target, timeout, is_cast, wait_after_msg) p.spawn_n(send_messages, i, client_builder, messages_count, duration) p.waitall() def spawn_notify_clients(threads, topic, transport, message_count, wait_after_msg, timeout, duration): p = eventlet.GreenPool(size=threads) for i in range(0, threads): client_builder = functools.partial(NotifyClient, i, transport, topic, wait_after_msg) p.spawn_n(send_messages, i, client_builder, message_count, duration) p.waitall() def send_messages(client_id, client_builder, messages_count, duration): client = client_builder() CLIENTS.append(client) if duration: with timeutils.StopWatch(duration) as stop_watch: while not stop_watch.expired(): client.send_msg() eventlet.sleep() else: LOG.debug("Sending %d messages using client %d", messages_count, client_id) for _ in six.moves.range(0, messages_count): client.send_msg() eventlet.sleep() LOG.debug("Client %d has sent %d messages", client_id, messages_count) time.sleep(1) # wait for replies to be collected def _rpc_call(client, msg): try: res = client.call({}, 'info', message=msg) except Exception as e: LOG.exception('Error %s on CALL for message %s', str(e), msg) else: LOG.debug("SENT: %s, RCV: %s", msg, res) return res def _rpc_cast(client, msg): try: client.cast({}, 'info', message=msg) except Exception as e: LOG.exception('Error %s on CAST for message %s', str(e), msg) else: LOG.debug("SENT: %s", msg) def _notify(notification_client, msg): notification_client.info({}, 'compute.start', msg) def show_server_stats(endpoint, args): LOG.info('=' * 35 + ' summary ' + '=' * 35) output = dict(series={}, summary={}) output['series']['server'] = endpoint.received_messages.get_series() stats = MessageStatsCollector.calc_stats( 'server', endpoint.received_messages) output['summary'] = stats def show_client_stats(clients, has_reply=False): LOG.info('=' * 35 + ' summary ' + '=' * 35) output = dict(series={}, summary={}) for cl in clients: cl_id = cl.client_id output['series']['client_%s' % cl_id] = cl.sent_messages.get_series() if has_reply: output['series']['round_trip_%s' % cl_id] = ( cl.round_trip_messages.get_series()) sent_stats = MessageStatsCollector.calc_stats( 'client', *(cl.sent_messages for cl in clients)) output['summary']['client'] = sent_stats if has_reply: round_trip_stats = MessageStatsCollector.calc_stats( 'round-trip', *(cl.round_trip_messages for cl in clients)) output['summary']['round_trip'] = round_trip_stats def _setup_logging(is_debug): log_level = logging.DEBUG if is_debug else logging.INFO logging.basicConfig( stream=sys.stdout, level=log_level, format="%(asctime)-15s %(levelname)s %(name)s %(message)s") logging.getLogger().handlers[0].addFilter(LoggingNoParsingFilter()) for i in ['kombu', 'amqp', 'stevedore', 'qpid.messaging' 'oslo.messaging._drivers.amqp', ]: logging.getLogger(i).setLevel(logging.WARN) def main(): parser = argparse.ArgumentParser( description='Tools to play with oslo.messaging\'s RPC', usage=USAGE, ) parser.add_argument('--url', dest='url', default='rabbit://guest:password@localhost/', help="oslo.messaging transport url") parser.add_argument('-d', '--debug', dest='debug', type=bool, default=False, help="Turn on DEBUG logging level instead of WARN") parser.add_argument('-tp', '--topic', dest='topic', default="profiler_topic", help="Topics to publish/receive messages to/from.") parser.add_argument('-s', '--server', dest='server', default="profiler_server", help="Servers to publish/receive messages to/from.") parser.add_argument('-tg', '--targets', dest='targets', nargs="+", default=["profiler_topic.profiler_server"], help="Targets to publish/receive messages to/from.") parser.add_argument('-l', dest='duration', type=int, help='send messages for certain time') parser.add_argument('--config-file', dest='config_file', type=str, help="Oslo messaging config file") subparsers = parser.add_subparsers(dest='mode', help='notify/rpc server/client mode') server = subparsers.add_parser('notify-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) server.add_argument('--requeue', dest='requeue', action='store_true') server = subparsers.add_parser('batch-notify-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) server.add_argument('--requeue', dest='requeue', action='store_true') client = subparsers.add_parser('notify-client') client.add_argument('-p', dest='threads', type=int, default=1, help='number of client threads') client.add_argument('-m', dest='messages', type=int, default=1, help='number of call per threads') client.add_argument('-w', dest='wait_after_msg', type=int, default=-1, help='sleep time between two messages') client.add_argument('--timeout', dest='timeout', type=int, default=3, help='client timeout') server = subparsers.add_parser('rpc-server') server.add_argument('-w', dest='wait_before_answer', type=int, default=-1) server.add_argument('-e', '--executor', dest='executor', type=str, default='eventlet', help='name of a message executor') client = subparsers.add_parser('rpc-client') client.add_argument('-p', dest='threads', type=int, default=1, help='number of client threads') client.add_argument('-m', dest='messages', type=int, default=1, help='number of call per threads') client.add_argument('-w', dest='wait_after_msg', type=int, default=-1, help='sleep time between two messages') client.add_argument('--timeout', dest='timeout', type=int, default=3, help='client timeout') client.add_argument('--exit-wait', dest='exit_wait', type=int, default=0, help='Keep connections open N seconds after calls ' 'have been done') client.add_argument('--is-cast', dest='is_cast', type=bool, default=False, help='Use `call` or `cast` RPC methods') client.add_argument('--is-fanout', dest='is_fanout', type=bool, default=False, help='fanout=True for CAST messages') args = parser.parse_args() _setup_logging(is_debug=args.debug) if args.config_file: cfg.CONF(["--config-file", args.config_file]) if args.mode in ['rpc-server', 'rpc-client']: transport = messaging.get_transport(cfg.CONF, url=args.url) else: transport = messaging.get_notification_transport(cfg.CONF, url=args.url) if args.mode in ['rpc-client', 'notify-client']: # always generate maximum number of messages for duration-limited tests generate_messages(MESSAGES_LIMIT if args.duration else args.messages) # oslo.config defaults cfg.CONF.heartbeat_interval = 5 cfg.CONF.prog = os.path.basename(__file__) cfg.CONF.project = 'oslo.messaging' if args.mode == 'rpc-server': target = messaging.Target(topic=args.topic, server=args.server) if args.url.startswith('zmq'): cfg.CONF.rpc_zmq_matchmaker = "redis" endpoint = rpc_server(transport, target, args.wait_before_answer, args.executor, args.duration) show_server_stats(endpoint, args) elif args.mode == 'notify-server': endpoint = notify_server(transport, args.topic, args.wait_before_answer, args.duration, args.requeue) show_server_stats(endpoint, args) elif args.mode == 'batch-notify-server': endpoint = batch_notify_server(transport, args.topic, args.wait_before_answer, args.duration, args.requeue) show_server_stats(endpoint, args) elif args.mode == 'notify-client': spawn_notify_clients(args.threads, args.topic, transport, args.messages, args.wait_after_msg, args.timeout, args.duration) show_client_stats(CLIENTS) elif args.mode == 'rpc-client': targets = [target.partition('.')[::2] for target in args.targets] targets = [messaging.Target( topic=topic, server=server_name, fanout=args.is_fanout) for topic, server_name in targets] spawn_rpc_clients(args.threads, transport, targets, args.wait_after_msg, args.timeout, args.is_cast, args.messages, args.duration) show_client_stats(CLIENTS, not args.is_cast) if args.exit_wait: LOG.info("Finished. waiting for %d seconds", args.exit_wait) time.sleep(args.exit_wait) if __name__ == '__main__': RANDOM_GENERATOR = init_random_generator() CURRENT_PID = os.getpid() main()