Optimize memory consumption in disagreement processing

Previously the full map from review to marks was constructed,
thus resulting in storing all marks records at peak. Now we use
the fact that marks are ordered by time in runtime storage and
if we observe marks for patch N it means that all from N-1 were
seen before. Thus the maximum memory allocation is reviews count
multiplied by number of marks in last patches (approximately
by average number of patches less then before)

Part of blueprint memory-optimizations

Change-Id: Ia33001f57b0d92f6ff562f8c51aecba5875e8825
This commit is contained in:
Ilya Shakhat 2014-04-29 15:02:53 +04:00
parent 5481e7796b
commit 3c61e0ba20
2 changed files with 94 additions and 35 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License.
import bisect
import collections
import copy
import time
@ -638,46 +639,61 @@ class RecordProcessor(object):
if user['core'] != core_old:
utils.store_user(self.runtime_storage_inst, user)
def _close_patch(self, cores, marks):
if len(marks) < 2:
return
core_mark = 0
for mark in sorted(marks, key=lambda x: x['date'], reverse=True):
if core_mark == 0:
if (mark['module'], mark['branch'], mark['user_id']) in cores:
# mark is from core engineer
core_mark = mark['value']
continue
disagreement = ((core_mark != 0) and
((core_mark < 0 < mark['value']) or
(core_mark > 0 > mark['value'])))
old_disagreement = mark.get('x')
mark['x'] = disagreement
if old_disagreement != disagreement:
yield mark
def _update_marks_with_disagreement(self):
LOG.debug('Process marks to find disagreements')
marks_per_patch = {}
cores = set()
for user in self.runtime_storage_inst.get_all_users():
for (module, branch) in (user['core'] or []):
cores.add((module, branch, user['user_id']))
# map from review_id to current patch and list of marks
marks_per_patch = collections.defaultdict(
lambda: {'patch_number': 0, 'marks': []})
for record in self.runtime_storage_inst.get_all_records():
if record['record_type'] == 'mark' and record['type'] == 'CRVW':
review_id = record['review_id']
patch_number = record['patch']
if (review_id, patch_number) in marks_per_patch:
marks_per_patch[(review_id, patch_number)].append(record)
else:
marks_per_patch[(review_id, patch_number)] = [record]
cores = dict([(user['user_id'], user)
for user in self.runtime_storage_inst.get_all_users()
if user['core']])
if review_id in marks_per_patch:
# review is already seen, check if patch is newer
if (marks_per_patch[review_id]['patch_number'] <
patch_number):
# the patch is new, close the current
for processed in self._close_patch(
cores, marks_per_patch[review_id]['marks']):
yield processed
del marks_per_patch[review_id]
for key, marks in six.iteritems(marks_per_patch):
if len(marks) < 2:
continue
marks_per_patch[review_id]['patch_number'] = patch_number
marks_per_patch[review_id]['marks'].append(record)
core_mark = 0
for mark in sorted(marks, key=lambda x: x['date'], reverse=True):
if core_mark == 0:
user_id = mark['user_id']
if user_id in cores:
user = cores[user_id]
if (mark['module'], mark['branch']) in user['core']:
# mark is from core engineer
core_mark = mark['value']
continue
disagreement = (core_mark != 0) and (
(core_mark < 0 < mark['value']) or
(core_mark > 0 > mark['value']))
old_disagreement = mark.get('x')
mark['x'] = disagreement
if old_disagreement != disagreement:
yield mark
# purge the rest
for marks_patch in marks_per_patch.values():
for processed in self._close_patch(cores, marks_patch['marks']):
yield processed
def update(self, release_index=None):
self.runtime_storage_inst.set_records(

View File

@ -925,6 +925,7 @@ class TestRecordProcessor(testtools.TestCase):
'createdOn': timestamp,
'module': 'nova',
'branch': 'master',
'status': 'NEW',
'patchSets': [
{'number': '1',
'revision': '4d8984e92910c37b7d101c1ae8c8283a2e6f4a76',
@ -936,7 +937,7 @@ class TestRecordProcessor(testtools.TestCase):
'createdOn': timestamp,
'approvals': [
{'type': 'CRVW', 'description': 'Code Review',
'value': '1', 'grantedOn': timestamp - 1,
'value': '2', 'grantedOn': timestamp - 1,
'by': {
'name': 'Homer Simpson',
'email': 'hsimpson@gmail.com',
@ -948,15 +949,57 @@ class TestRecordProcessor(testtools.TestCase):
'email': 'john_doe@ibm.com',
'username': 'john_doe'}}
]
}]}
},
{'number': '2',
'revision': '4d8984e92910c37b7d101c1ae8c8283a2e6f4a76',
'ref': 'refs/changes/16/58516/1',
'uploader': {
'name': 'Bill Smith',
'email': 'bill@smith.to',
'username': 'bsmith'},
'createdOn': timestamp + 1,
'approvals': [
{'type': 'CRVW', 'description': 'Code Review',
'value': '1', 'grantedOn': timestamp + 2,
'by': {
'name': 'Homer Simpson',
'email': 'hsimpson@gmail.com',
'username': 'homer'}},
{'type': 'CRVW', 'description': 'Code Review',
'value': '-1', 'grantedOn': timestamp + 3,
'by': {
'name': 'Bart Simpson',
'email': 'bsimpson@gmail.com',
'username': 'bart'}},
{'type': 'CRVW', 'description': 'Code Review',
'value': '2', 'grantedOn': timestamp + 4,
'by': {
'name': 'John Doe',
'email': 'john_doe@ibm.com',
'username': 'john_doe'}}
]
}
]}
]))
record_processor_inst.update()
marks = list([r for r in runtime_storage_inst.get_all_records()
if r['record_type'] == 'mark'])
homer_mark = next(itertools.ifilter(
lambda x: x['date'] == (timestamp - 1), marks), None)
self.assertTrue(homer_mark['x']) # disagreement
self.assertTrue(homer_mark.get('x'),
msg='Disagreement: core set -2 after +2')
homer_mark = next(itertools.ifilter(
lambda x: x['date'] == (timestamp + 2), marks), None)
self.assertFalse(homer_mark.get('x'),
msg='No disagreement: core set +2 after +1')
bart_mark = next(itertools.ifilter(
lambda x: x['date'] == (timestamp + 3), marks), None)
self.assertTrue(bart_mark.get('x'),
msg='Disagreement: core set +2 after -1')
def test_commit_merge_date(self):
record_processor_inst = self.make_record_processor()
@ -1260,7 +1303,7 @@ def generate_emails(author_name='John Doe', author_email='johndoe@gmail.com',
def make_runtime_storage(users=None, companies=None, releases=None,
repos=None):
runtime_storage_cache = {}
runtime_storage_record_keys = set([])
runtime_storage_record_keys = []
def get_by_key(key):
if key == 'companies':
@ -1297,7 +1340,7 @@ def make_runtime_storage(users=None, companies=None, releases=None,
def set_records(records_iterator):
for record in records_iterator:
runtime_storage_cache[record['primary_key']] = record
runtime_storage_record_keys.add(record['primary_key'])
runtime_storage_record_keys.append(record['primary_key'])
def get_all_records():
return [runtime_storage_cache[key]