Merge "Implemented bulk read from memcached"
This commit is contained in:
commit
a4857bf8ed
@ -23,6 +23,10 @@ from oslo.config import cfg
|
|||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BULK_READ_SIZE = 64
|
||||||
|
RECORD_ID_PREFIX = 'record:'
|
||||||
|
UPDATE_ID_PREFIX = 'update:'
|
||||||
|
|
||||||
|
|
||||||
class RuntimeStorage(object):
|
class RuntimeStorage(object):
|
||||||
def __init__(self, uri):
|
def __init__(self, uri):
|
||||||
@ -106,12 +110,16 @@ class MemcachedStorage(RuntimeStorage):
|
|||||||
self._set_pids(pid)
|
self._set_pids(pid)
|
||||||
|
|
||||||
if not last_update:
|
if not last_update:
|
||||||
for record_id in range(0, self._get_record_count()):
|
for i in self._get_all_records():
|
||||||
yield self.memcached.get(self._get_record_name(record_id))
|
yield i
|
||||||
else:
|
else:
|
||||||
for update_id in range(last_update, update_count):
|
for update_id_set in self._make_range(last_update, update_count,
|
||||||
yield self.memcached.get(self._get_record_name(
|
BULK_READ_SIZE):
|
||||||
self.memcached.get('update:%s' % update_id)))
|
update_set = self.memcached.get_multi(update_id_set,
|
||||||
|
UPDATE_ID_PREFIX)
|
||||||
|
for i in self.memcached.get_multi(update_set,
|
||||||
|
RECORD_ID_PREFIX):
|
||||||
|
yield i
|
||||||
|
|
||||||
def active_pids(self, pids):
|
def active_pids(self, pids):
|
||||||
stored_pids = self.memcached.get('pids') or set()
|
stored_pids = self.memcached.get('pids') or set()
|
||||||
@ -135,7 +143,7 @@ class MemcachedStorage(RuntimeStorage):
|
|||||||
first_valid_update_id = 0
|
first_valid_update_id = 0
|
||||||
|
|
||||||
for i in range(first_valid_update_id, min_update):
|
for i in range(first_valid_update_id, min_update):
|
||||||
self.memcached.delete('update:%s' % i)
|
self.memcached.delete(UPDATE_ID_PREFIX + str(i))
|
||||||
|
|
||||||
self.memcached.set('first_valid_update_id', min_update)
|
self.memcached.set('first_valid_update_id', min_update)
|
||||||
|
|
||||||
@ -162,7 +170,7 @@ class MemcachedStorage(RuntimeStorage):
|
|||||||
self.memcached.set('pids', pids)
|
self.memcached.set('pids', pids)
|
||||||
|
|
||||||
def _get_record_name(self, record_id):
|
def _get_record_name(self, record_id):
|
||||||
return 'record:%s' % record_id
|
return RECORD_ID_PREFIX + record_id
|
||||||
|
|
||||||
def _get_record_count(self):
|
def _get_record_count(self):
|
||||||
return self.memcached.get('record:count') or 0
|
return self.memcached.get('record:count') or 0
|
||||||
@ -170,14 +178,23 @@ class MemcachedStorage(RuntimeStorage):
|
|||||||
def _set_record_count(self, count):
|
def _set_record_count(self, count):
|
||||||
self.memcached.set('record:count', count)
|
self.memcached.set('record:count', count)
|
||||||
|
|
||||||
|
def _make_range(self, start, stop, step):
|
||||||
|
i = start
|
||||||
|
for i in range(start, stop, step):
|
||||||
|
yield range(i, i + step)
|
||||||
|
if (stop - start) % step > 0:
|
||||||
|
yield range(i, stop)
|
||||||
|
|
||||||
def _get_all_records(self):
|
def _get_all_records(self):
|
||||||
count = self.memcached.get('record:count') or 0
|
for record_id_set in self._make_range(0, self._get_record_count(),
|
||||||
for i in range(0, count):
|
BULK_READ_SIZE):
|
||||||
yield self.memcached.get('record:%s' % i)
|
for i in self.memcached.get_multi(
|
||||||
|
record_id_set, RECORD_ID_PREFIX).values():
|
||||||
|
yield i
|
||||||
|
|
||||||
def _commit_update(self, record_id):
|
def _commit_update(self, record_id):
|
||||||
count = self._get_update_count()
|
count = self._get_update_count()
|
||||||
self.memcached.set('update:%s' % count, record_id)
|
self.memcached.set(UPDATE_ID_PREFIX + count, record_id)
|
||||||
self.memcached.set('update:count', count + 1)
|
self.memcached.set('update:count', count + 1)
|
||||||
|
|
||||||
def _build_index(self):
|
def _build_index(self):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user