From 07cfdde59d0be241abec048c49fad8f7b896e3ee Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 15 Jul 2013 16:22:20 +0400 Subject: [PATCH] Implemented bulk read from memcached Implements blueprint bulk-memcached-io Change-Id: I20e20653b1ac24ee3239fc481f70187f4a71989f --- stackalytics/processor/runtime_storage.py | 39 ++++++++++++++++------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index e72333273..b6d33a1bc 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -23,6 +23,10 @@ from oslo.config import cfg LOG = logging.getLogger(__name__) +BULK_READ_SIZE = 64 +RECORD_ID_PREFIX = 'record:' +UPDATE_ID_PREFIX = 'update:' + class RuntimeStorage(object): def __init__(self, uri): @@ -106,12 +110,16 @@ class MemcachedStorage(RuntimeStorage): self._set_pids(pid) if not last_update: - for record_id in range(0, self._get_record_count()): - yield self.memcached.get(self._get_record_name(record_id)) + for i in self._get_all_records(): + yield i else: - for update_id in range(last_update, update_count): - yield self.memcached.get(self._get_record_name( - self.memcached.get('update:%s' % update_id))) + for update_id_set in self._make_range(last_update, update_count, + BULK_READ_SIZE): + update_set = self.memcached.get_multi(update_id_set, + UPDATE_ID_PREFIX) + for i in self.memcached.get_multi(update_set, + RECORD_ID_PREFIX): + yield i def active_pids(self, pids): stored_pids = self.memcached.get('pids') or set() @@ -135,7 +143,7 @@ class MemcachedStorage(RuntimeStorage): first_valid_update_id = 0 for i in range(first_valid_update_id, min_update): - self.memcached.delete('update:%s' % i) + self.memcached.delete(UPDATE_ID_PREFIX + str(i)) self.memcached.set('first_valid_update_id', min_update) @@ -162,7 +170,7 @@ class MemcachedStorage(RuntimeStorage): self.memcached.set('pids', pids) def _get_record_name(self, record_id): - return 'record:%s' % record_id + return RECORD_ID_PREFIX + record_id def _get_record_count(self): return self.memcached.get('record:count') or 0 @@ -170,14 +178,23 @@ class MemcachedStorage(RuntimeStorage): def _set_record_count(self, count): self.memcached.set('record:count', count) + def _make_range(self, start, stop, step): + i = start + for i in range(start, stop, step): + yield range(i, i + step) + if (stop - start) % step > 0: + yield range(i, stop) + def _get_all_records(self): - count = self.memcached.get('record:count') or 0 - for i in range(0, count): - yield self.memcached.get('record:%s' % i) + for record_id_set in self._make_range(0, self._get_record_count(), + BULK_READ_SIZE): + for i in self.memcached.get_multi( + record_id_set, RECORD_ID_PREFIX).values(): + yield i def _commit_update(self, record_id): count = self._get_update_count() - self.memcached.set('update:%s' % count, record_id) + self.memcached.set(UPDATE_ID_PREFIX + count, record_id) self.memcached.set('update:count', count + 1) def _build_index(self):