diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index b1ced9a68..47f9829a7 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -14,6 +14,7 @@ # limitations under the License. import bisect +import collections import copy import time @@ -638,46 +639,61 @@ class RecordProcessor(object): if user['core'] != core_old: utils.store_user(self.runtime_storage_inst, user) + def _close_patch(self, cores, marks): + if len(marks) < 2: + return + + core_mark = 0 + for mark in sorted(marks, key=lambda x: x['date'], reverse=True): + + if core_mark == 0: + if (mark['module'], mark['branch'], mark['user_id']) in cores: + # mark is from core engineer + core_mark = mark['value'] + continue + + disagreement = ((core_mark != 0) and + ((core_mark < 0 < mark['value']) or + (core_mark > 0 > mark['value']))) + old_disagreement = mark.get('x') + mark['x'] = disagreement + if old_disagreement != disagreement: + yield mark + def _update_marks_with_disagreement(self): LOG.debug('Process marks to find disagreements') - marks_per_patch = {} + cores = set() + for user in self.runtime_storage_inst.get_all_users(): + for (module, branch) in (user['core'] or []): + cores.add((module, branch, user['user_id'])) + + # map from review_id to current patch and list of marks + marks_per_patch = collections.defaultdict( + lambda: {'patch_number': 0, 'marks': []}) + for record in self.runtime_storage_inst.get_all_records(): if record['record_type'] == 'mark' and record['type'] == 'CRVW': review_id = record['review_id'] patch_number = record['patch'] - if (review_id, patch_number) in marks_per_patch: - marks_per_patch[(review_id, patch_number)].append(record) - else: - marks_per_patch[(review_id, patch_number)] = [record] - cores = dict([(user['user_id'], user) - for user in self.runtime_storage_inst.get_all_users() - if user['core']]) + if review_id in marks_per_patch: + # review is already seen, check if patch is newer + if (marks_per_patch[review_id]['patch_number'] < + patch_number): + # the patch is new, close the current + for processed in self._close_patch( + cores, marks_per_patch[review_id]['marks']): + yield processed + del marks_per_patch[review_id] - for key, marks in six.iteritems(marks_per_patch): - if len(marks) < 2: - continue + marks_per_patch[review_id]['patch_number'] = patch_number + marks_per_patch[review_id]['marks'].append(record) - core_mark = 0 - for mark in sorted(marks, key=lambda x: x['date'], reverse=True): - - if core_mark == 0: - user_id = mark['user_id'] - if user_id in cores: - user = cores[user_id] - if (mark['module'], mark['branch']) in user['core']: - # mark is from core engineer - core_mark = mark['value'] - continue - - disagreement = (core_mark != 0) and ( - (core_mark < 0 < mark['value']) or - (core_mark > 0 > mark['value'])) - old_disagreement = mark.get('x') - mark['x'] = disagreement - if old_disagreement != disagreement: - yield mark + # purge the rest + for marks_patch in marks_per_patch.values(): + for processed in self._close_patch(cores, marks_patch['marks']): + yield processed def update(self, release_index=None): self.runtime_storage_inst.set_records( diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index 40fe00432..a61140d4f 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -925,6 +925,7 @@ class TestRecordProcessor(testtools.TestCase): 'createdOn': timestamp, 'module': 'nova', 'branch': 'master', + 'status': 'NEW', 'patchSets': [ {'number': '1', 'revision': '4d8984e92910c37b7d101c1ae8c8283a2e6f4a76', @@ -936,7 +937,7 @@ class TestRecordProcessor(testtools.TestCase): 'createdOn': timestamp, 'approvals': [ {'type': 'CRVW', 'description': 'Code Review', - 'value': '1', 'grantedOn': timestamp - 1, + 'value': '2', 'grantedOn': timestamp - 1, 'by': { 'name': 'Homer Simpson', 'email': 'hsimpson@gmail.com', @@ -948,15 +949,57 @@ class TestRecordProcessor(testtools.TestCase): 'email': 'john_doe@ibm.com', 'username': 'john_doe'}} ] - }]} + }, + {'number': '2', + 'revision': '4d8984e92910c37b7d101c1ae8c8283a2e6f4a76', + 'ref': 'refs/changes/16/58516/1', + 'uploader': { + 'name': 'Bill Smith', + 'email': 'bill@smith.to', + 'username': 'bsmith'}, + 'createdOn': timestamp + 1, + 'approvals': [ + {'type': 'CRVW', 'description': 'Code Review', + 'value': '1', 'grantedOn': timestamp + 2, + 'by': { + 'name': 'Homer Simpson', + 'email': 'hsimpson@gmail.com', + 'username': 'homer'}}, + {'type': 'CRVW', 'description': 'Code Review', + 'value': '-1', 'grantedOn': timestamp + 3, + 'by': { + 'name': 'Bart Simpson', + 'email': 'bsimpson@gmail.com', + 'username': 'bart'}}, + {'type': 'CRVW', 'description': 'Code Review', + 'value': '2', 'grantedOn': timestamp + 4, + 'by': { + 'name': 'John Doe', + 'email': 'john_doe@ibm.com', + 'username': 'john_doe'}} + ] + } + ]} ])) record_processor_inst.update() marks = list([r for r in runtime_storage_inst.get_all_records() if r['record_type'] == 'mark']) + homer_mark = next(itertools.ifilter( lambda x: x['date'] == (timestamp - 1), marks), None) - self.assertTrue(homer_mark['x']) # disagreement + self.assertTrue(homer_mark.get('x'), + msg='Disagreement: core set -2 after +2') + + homer_mark = next(itertools.ifilter( + lambda x: x['date'] == (timestamp + 2), marks), None) + self.assertFalse(homer_mark.get('x'), + msg='No disagreement: core set +2 after +1') + + bart_mark = next(itertools.ifilter( + lambda x: x['date'] == (timestamp + 3), marks), None) + self.assertTrue(bart_mark.get('x'), + msg='Disagreement: core set +2 after -1') def test_commit_merge_date(self): record_processor_inst = self.make_record_processor() @@ -1260,7 +1303,7 @@ def generate_emails(author_name='John Doe', author_email='johndoe@gmail.com', def make_runtime_storage(users=None, companies=None, releases=None, repos=None): runtime_storage_cache = {} - runtime_storage_record_keys = set([]) + runtime_storage_record_keys = [] def get_by_key(key): if key == 'companies': @@ -1297,7 +1340,7 @@ def make_runtime_storage(users=None, companies=None, releases=None, def set_records(records_iterator): for record in records_iterator: runtime_storage_cache[record['primary_key']] = record - runtime_storage_record_keys.add(record['primary_key']) + runtime_storage_record_keys.append(record['primary_key']) def get_all_records(): return [runtime_storage_cache[key]