Implemented bulk read from memcached

Implements blueprint bulk-memcached-io

Change-Id: I20e20653b1ac24ee3239fc481f70187f4a71989f
This commit is contained in:
Ilya Shakhat 2013-07-15 16:22:20 +04:00
parent cef22af5d6
commit 07cfdde59d

View File

@ -23,6 +23,10 @@ from oslo.config import cfg
LOG = logging.getLogger(__name__)
BULK_READ_SIZE = 64
RECORD_ID_PREFIX = 'record:'
UPDATE_ID_PREFIX = 'update:'
class RuntimeStorage(object):
def __init__(self, uri):
@ -106,12 +110,16 @@ class MemcachedStorage(RuntimeStorage):
self._set_pids(pid)
if not last_update:
for record_id in range(0, self._get_record_count()):
yield self.memcached.get(self._get_record_name(record_id))
for i in self._get_all_records():
yield i
else:
for update_id in range(last_update, update_count):
yield self.memcached.get(self._get_record_name(
self.memcached.get('update:%s' % update_id)))
for update_id_set in self._make_range(last_update, update_count,
BULK_READ_SIZE):
update_set = self.memcached.get_multi(update_id_set,
UPDATE_ID_PREFIX)
for i in self.memcached.get_multi(update_set,
RECORD_ID_PREFIX):
yield i
def active_pids(self, pids):
stored_pids = self.memcached.get('pids') or set()
@ -135,7 +143,7 @@ class MemcachedStorage(RuntimeStorage):
first_valid_update_id = 0
for i in range(first_valid_update_id, min_update):
self.memcached.delete('update:%s' % i)
self.memcached.delete(UPDATE_ID_PREFIX + str(i))
self.memcached.set('first_valid_update_id', min_update)
@ -162,7 +170,7 @@ class MemcachedStorage(RuntimeStorage):
self.memcached.set('pids', pids)
def _get_record_name(self, record_id):
return 'record:%s' % record_id
return RECORD_ID_PREFIX + record_id
def _get_record_count(self):
return self.memcached.get('record:count') or 0
@ -170,14 +178,23 @@ class MemcachedStorage(RuntimeStorage):
def _set_record_count(self, count):
self.memcached.set('record:count', count)
def _make_range(self, start, stop, step):
i = start
for i in range(start, stop, step):
yield range(i, i + step)
if (stop - start) % step > 0:
yield range(i, stop)
def _get_all_records(self):
count = self.memcached.get('record:count') or 0
for i in range(0, count):
yield self.memcached.get('record:%s' % i)
for record_id_set in self._make_range(0, self._get_record_count(),
BULK_READ_SIZE):
for i in self.memcached.get_multi(
record_id_set, RECORD_ID_PREFIX).values():
yield i
def _commit_update(self, record_id):
count = self._get_update_count()
self.memcached.set('update:%s' % count, record_id)
self.memcached.set(UPDATE_ID_PREFIX + count, record_id)
self.memcached.set('update:count', count + 1)
def _build_index(self):