From dea1580edf633d50087dc4e38d9cccb9e0d1eadb Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 21 Oct 2013 17:55:39 +0400 Subject: [PATCH] Users are stored individually, not in a single collection Users were stored in collection 'users', that collection was updated from default data and after the whole update process. In case of failure the collection misses all changes. The issue is fixed by storing all users individually by their user_id, email and launchpad_id. Fixes bug 1242588 Change-Id: Ie3d57ec7f3f18c4259dbd34cc8f29be45b06e2cd --- stackalytics/processor/record_processor.py | 25 +++----- stackalytics/processor/runtime_storage.py | 2 +- stackalytics/processor/utils.py | 4 ++ tests/unit/test_record_processor.py | 71 ++++++++++++++-------- 4 files changed, 60 insertions(+), 42 deletions(-) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 811615223..6cafbebae 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -29,8 +29,6 @@ class RecordProcessor(object): self.domains_index = runtime_storage_inst.get_by_key('companies') - self.users_index = runtime_storage_inst.get_by_key('users') - self.releases = runtime_storage_inst.get_by_key('releases') self.releases_dates = [r['end_date'] for r in self.releases] @@ -142,18 +140,19 @@ class RecordProcessor(object): def update_user(self, record): email = record.get('author_email') - if email in self.users_index: - user = self.users_index[email] - else: + user = utils.load_user(self.runtime_storage_inst, email) + if not user: if record.get('launchpad_id'): launchpad_id = record.get('launchpad_id') user_name = record.get('author_name') else: launchpad_id, user_name = self._get_lp_info(email) - if (launchpad_id) and (launchpad_id in self.users_index): + if launchpad_id: + user = utils.load_user(self.runtime_storage_inst, launchpad_id) + + if user: # merge emails - user = self.users_index[launchpad_id] if email: self._update_user_profile(user, email) else: @@ -165,10 +164,6 @@ class RecordProcessor(object): user = self._create_user(launchpad_id, email, user_name) utils.store_user(self.runtime_storage_inst, user) - if email: - self.users_index[email] = user - if user['launchpad_id']: - self.users_index[user['launchpad_id']] = user return user @@ -198,7 +193,7 @@ class RecordProcessor(object): yield record def _spawn_review(self, record): - # copy everything except pathsets and flatten user data + # copy everything except patchsets and flatten user data review = dict([(k, v) for k, v in record.iteritems() if k not in ['patchSets', 'owner', 'createdOn']]) owner = record['owner'] @@ -340,8 +335,6 @@ class RecordProcessor(object): yield r - self.runtime_storage_inst.set_by_key('users', self.users_index) - def update(self, record_iterator, release_index): for record in record_iterator: need_update = False @@ -369,8 +362,6 @@ class RecordProcessor(object): if need_update: yield record - self.runtime_storage_inst.set_by_key('users', self.users_index) - def _get_records_for_users_to_update(self): users_reviews = {} valid_blueprints = {} @@ -424,7 +415,7 @@ class RecordProcessor(object): user_id = record['user_id'] if user_id in self.updated_users: - user = self.users_index[user_id] + user = utils.load_user(self.runtime_storage_inst, user_id) user_company_name = user['companies'][0]['company_name'] if record['company_name'] != user_company_name: LOG.debug('Update record %s: company changed to: %s', diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index f714ba3eb..e3d38218f 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -42,7 +42,7 @@ class RuntimeStorage(object): def get_by_key(self, key): pass - def set_by_key(self, key, head_commit_id): + def set_by_key(self, key, value): pass def get_update(self, pid): diff --git a/stackalytics/processor/utils.py b/stackalytics/processor/utils.py index 6cd125171..d2e166838 100644 --- a/stackalytics/processor/utils.py +++ b/stackalytics/processor/utils.py @@ -82,6 +82,10 @@ def make_range(start, stop, step): def store_user(runtime_storage_inst, user): runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) + if user.get('launchpad_id'): + runtime_storage_inst.set_by_key('user:%s' % user['launchpad_id'], user) + for email in user.get('emails') or []: + runtime_storage_inst.set_by_key('user:%s' % email, user) def load_user(runtime_storage_inst, user_id): diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index 8296884b7..6f4369cfd 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -202,8 +202,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - self.assertIn('johndoe@ibm.com', - record_processor_inst.users_index['john_doe']['emails']) + self.assertIn('johndoe@ibm.com', utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')['emails']) def test_process_commit_existing_user_new_email_unknown_company(self): # User is known to LP, but his email is new to us. Should match @@ -232,8 +232,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - self.assertIn('johndoe@gmail.com', - record_processor_inst.users_index['john_doe']['emails']) + self.assertIn('johndoe@gmail.com', utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')['emails']) def test_process_commit_existing_user_new_email_known_company_update(self): record_processor_inst = self.make_record_processor( @@ -261,7 +261,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertIn('johndoe@gmail.com', user['emails']) self.assertEquals('IBM', user['companies'][0]['company_name'], message='User affiliation should be updated') @@ -286,7 +287,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertIn('johndoe@ibm.com', user['emails']) self.assertEquals('IBM', user['companies'][0]['company_name']) @@ -308,8 +310,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - self.assertEquals(1, len(record_processor_inst.users_index)) - user = record_processor_inst.users_index['johndoe@ibm.com'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'johndoe@ibm.com') self.assertIn('johndoe@ibm.com', user['emails']) self.assertEquals('IBM', user['companies'][0]['company_name']) self.assertEquals(None, user['launchpad_id']) @@ -338,8 +340,8 @@ class TestRecordProcessor(testtools.TestCase): 'company_name': '*independent'}, processed_records[0]) - self.assertEquals(1, len(record_processor_inst.users_index)) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertEquals({ 'user_id': 'john_doe', 'launchpad_id': 'john_doe', @@ -372,8 +374,8 @@ class TestRecordProcessor(testtools.TestCase): 'company_name': '*independent'}, processed_records[0]) - self.assertEquals(1, len(record_processor_inst.users_index)) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertEquals({ 'user_id': 'john_doe', 'launchpad_id': 'john_doe', @@ -423,8 +425,10 @@ class TestRecordProcessor(testtools.TestCase): 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} - self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user}, - record_processor_inst.users_index) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com')) def test_process_blueprint_then_commit(self): record_processor_inst = self.make_record_processor( @@ -469,8 +473,10 @@ class TestRecordProcessor(testtools.TestCase): 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} - self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user}, - record_processor_inst.users_index) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com')) def test_process_review_then_blueprint(self): record_processor_inst = self.make_record_processor( @@ -513,8 +519,10 @@ class TestRecordProcessor(testtools.TestCase): 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} - self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user}, - record_processor_inst.users_index) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com')) # update records @@ -788,22 +796,37 @@ def generate_emails(author_name='John Doe', author_email='johndoe@gmail.com', def make_runtime_storage(users=None, companies=None, releases=None, repos=None): - def get_by_key(collection): - if collection == 'companies': + runtime_storage_cache = {} + + def get_by_key(key): + if key == 'companies': return _make_companies(companies or [ {"company_name": "*independent", "domains": [""]}, ]) - elif collection == 'users': + elif key == 'users': return _make_users(users or []) - elif collection == 'releases': + elif key == 'releases': return releases or RELEASES - elif collection == 'repos': + elif key == 'repos': return repos or REPOS else: - raise Exception('Wrong collection: %s' % collection) + return runtime_storage_cache.get(key) + + def set_by_key(key, value): + runtime_storage_cache[key] = value rs = mock.Mock(runtime_storage.RuntimeStorage) rs.get_by_key = mock.Mock(side_effect=get_by_key) + rs.set_by_key = mock.Mock(side_effect=set_by_key) + + if users: + for user in users: + set_by_key('user:%s' % user['user_id'], user) + if user.get('launchpad_id'): + set_by_key('user:%s' % user['launchpad_id'], user) + for email in user.get('emails') or []: + set_by_key('user:%s' % email, user) + return rs