Implemented update mechanism for memory storage

Implements blueprint update-mechanism

Change-Id: I7c8381366f9a76084b6e1a9e1cc3a9ad2502cdcb
This commit is contained in:
Ilya Shakhat 2013-07-15 16:51:37 +04:00
parent a4857bf8ed
commit 90d6e1cd43
3 changed files with 28 additions and 9 deletions

View File

@ -35,14 +35,14 @@ class CachedMemoryStorage(MemoryStorage):
self.release_index = {}
self.dates = []
for record in records:
self.records[record['record_id']] = record
self.index(record)
self._save_record(record)
self.dates = sorted(self.date_index)
self.company_name_mapping = dict((c.lower(), c)
for c in self.company_index.keys())
def index(self, record):
def _save_record(self, record):
self.records[record['record_id']] = record
self._add_to_index(self.company_index, record, 'company_name')
self._add_to_index(self.module_index, record, 'module')
self._add_to_index(self.launchpad_id_index, record, 'launchpad_id')
@ -52,6 +52,14 @@ class CachedMemoryStorage(MemoryStorage):
record['week'] = user_utils.timestamp_to_week(record['date'])
record['loc'] = record['lines_added'] + record['lines_deleted']
def _remove_record_from_index(self, record):
self.company_index[record['company_name']].remove(record['record_id'])
self.module_index[record['module']].remove(record['record_id'])
self.launchpad_id_index[record['launchpad_id']].remove(
record['record_id'])
self.release_index[record['release']].remove(record['record_id'])
self.date_index[record['date']].remove(record['record_id'])
def _add_to_index(self, record_index, record, key):
record_key = record[key]
if record_key in record_index:
@ -104,6 +112,12 @@ class CachedMemoryStorage(MemoryStorage):
def get_launchpad_ids(self):
return self.launchpad_id_index.keys()
def update(self, records):
for record in records:
if record['record_id'] in self.records:
self._remove_record_from_index(record)
self._save_record(record)
def get_memory_storage(memory_storage_type, records):
if memory_storage_type == MEMORY_STORAGE_CACHED:

View File

@ -83,6 +83,11 @@ def get_vault():
vault['modules'] = dict((r['module'].lower(),
r['project_type'].lower()) for r in modules)
app.stackalytics_vault = vault
else:
memory_storage_inst = vault['memory_storage']
memory_storage_inst.update(
vault['runtime_storage'].get_update(os.getpid()))
return vault

View File

@ -115,10 +115,10 @@ class MemcachedStorage(RuntimeStorage):
else:
for update_id_set in self._make_range(last_update, update_count,
BULK_READ_SIZE):
update_set = self.memcached.get_multi(update_id_set,
UPDATE_ID_PREFIX)
for i in self.memcached.get_multi(update_set,
RECORD_ID_PREFIX):
update_set = self.memcached.get_multi(
update_id_set, UPDATE_ID_PREFIX).values()
for i in self.memcached.get_multi(
update_set, RECORD_ID_PREFIX).values():
yield i
def active_pids(self, pids):
@ -170,7 +170,7 @@ class MemcachedStorage(RuntimeStorage):
self.memcached.set('pids', pids)
def _get_record_name(self, record_id):
return RECORD_ID_PREFIX + record_id
return RECORD_ID_PREFIX + str(record_id)
def _get_record_count(self):
return self.memcached.get('record:count') or 0
@ -194,7 +194,7 @@ class MemcachedStorage(RuntimeStorage):
def _commit_update(self, record_id):
count = self._get_update_count()
self.memcached.set(UPDATE_ID_PREFIX + count, record_id)
self.memcached.set(UPDATE_ID_PREFIX + str(count), record_id)
self.memcached.set('update:count', count + 1)
def _build_index(self):