diff --git a/dashboard/memory_storage.py b/dashboard/memory_storage.py index ad020d934..be5d79fb5 100644 --- a/dashboard/memory_storage.py +++ b/dashboard/memory_storage.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from stackalytics.processor import user_utils - MEMORY_STORAGE_CACHED = 0 @@ -27,38 +25,42 @@ class CachedMemoryStorage(MemoryStorage): def __init__(self, records): super(CachedMemoryStorage, self).__init__(records) + # common indexes self.records = {} - self.company_index = {} - self.date_index = {} + self.record_types_index = {} self.module_index = {} self.launchpad_id_index = {} + self.company_index = {} self.release_index = {} - self.dates = [] + + self.indexes = { + 'record_type': self.record_types_index, + 'company_name': self.company_index, + 'module': self.module_index, + 'launchpad_id': self.launchpad_id_index, + 'release': self.release_index, + } + for record in records: self._save_record(record) - self.dates = sorted(self.date_index) + self.company_name_mapping = dict((c.lower(), c) for c in self.company_index.keys()) def _save_record(self, record): - self.records[record['record_id']] = record - self._add_to_index(self.company_index, record, 'company_name') - self._add_to_index(self.module_index, record, 'module') - self._add_to_index(self.launchpad_id_index, record, 'launchpad_id') - self._add_to_index(self.release_index, record, 'release') - self._add_to_index(self.date_index, record, 'date') + for key, index in self.indexes.iteritems(): + self._add_to_index(index, record, key) - record['week'] = user_utils.timestamp_to_week(record['date']) - record['loc'] = record['lines_added'] + record['lines_deleted'] + def update(self, records): + for record in records: + if record['record_id'] in self.records: + self._remove_record_from_index(record) + self._save_record(record) def _remove_record_from_index(self, record): - self.company_index[record['company_name']].remove(record['record_id']) - self.module_index[record['module']].remove(record['record_id']) - self.launchpad_id_index[record['launchpad_id']].remove( - record['record_id']) - self.release_index[record['release']].remove(record['record_id']) - self.date_index[record['date']].remove(record['record_id']) + for key, index in self.indexes.iteritems(): + index[record[key]].remove(record['record_id']) def _add_to_index(self, record_index, record, key): record_key = record[key] @@ -70,9 +72,8 @@ class CachedMemoryStorage(MemoryStorage): def _get_record_ids_from_index(self, items, index): record_ids = set() for item in items: - if item not in index: - raise Exception('Parameter %s not valid' % item) - record_ids |= index[item] + if item in index: + record_ids |= index[item] return record_ids def get_record_ids_by_modules(self, modules): @@ -91,7 +92,16 @@ class CachedMemoryStorage(MemoryStorage): return self._get_record_ids_from_index(releases, self.release_index) def get_record_ids(self): - return set(self.records.keys()) + return self.records.keys() + + def get_commit_ids(self): + return self.record_types_index['commit'] + + def get_review_ids(self): + return self.record_types_index['review'] + + def get_mark_ids(self): + return self.record_types_index['mark'] def get_records(self, record_ids): for i in record_ids: @@ -112,12 +122,6 @@ class CachedMemoryStorage(MemoryStorage): def get_launchpad_ids(self): return self.launchpad_id_index.keys() - def update(self, records): - for record in records: - if record['record_id'] in self.records: - self._remove_record_from_index(record) - self._save_record(record) - def get_memory_storage(memory_storage_type, records): if memory_storage_type == MEMORY_STORAGE_CACHED: diff --git a/dashboard/templates/engineer_details.html b/dashboard/templates/engineer_details.html index dac1e7c22..a422d472f 100644 --- a/dashboard/templates/engineer_details.html +++ b/dashboard/templates/engineer_details.html @@ -84,4 +84,9 @@ {% endif %} +

Marks

+ {% for mark in [-2, -1, 0, 1, 2] %} +
{{ mark }}: {{ marks[mark] }}
+ {% endfor %} + {% endblock %} diff --git a/dashboard/templates/layout.html b/dashboard/templates/layout.html index a57e6ec34..6ebdd2bc1 100644 --- a/dashboard/templates/layout.html +++ b/dashboard/templates/layout.html @@ -284,6 +284,8 @@ diff --git a/dashboard/web.py b/dashboard/web.py index 7a9fefdde..740c19181 100644 --- a/dashboard/web.py +++ b/dashboard/web.py @@ -45,6 +45,8 @@ DEFAULTS = { METRIC_LABELS = { 'loc': 'Lines of code', 'commits': 'Commits', + 'reviews': 'Reviews', + 'marks': 'Marks', } DEFAULT_RECORDS_LIMIT = 10 @@ -173,12 +175,13 @@ def get_default(param_name): return None -def get_parameter(kwargs, singular_name, plural_name, use_default=True): +def get_parameter(kwargs, singular_name, plural_name=None, use_default=True): if singular_name in kwargs: p = kwargs[singular_name] else: - p = (flask.request.args.get(singular_name) or - flask.request.args.get(plural_name)) + p = flask.request.args.get(singular_name) + if (not p) and plural_name: + flask.request.args.get(plural_name) if p: return p.split(',') elif use_default: @@ -200,7 +203,7 @@ def record_filter(ignore=None, use_default=True): vault = get_vault() memory_storage = vault['memory_storage'] - record_ids = memory_storage.get_record_ids() + record_ids = set(memory_storage.get_record_ids()) # make a copy if 'module' not in ignore: param = get_parameter(kwargs, 'module', 'modules', use_default) @@ -242,6 +245,15 @@ def record_filter(ignore=None, use_default=True): memory_storage.get_record_ids_by_releases( c.lower() for c in param)) + if 'metric' not in ignore: + param = get_parameter(kwargs, 'metric') + if 'reviews' in param: + record_ids &= memory_storage.get_review_ids() + elif 'marks' in param: + record_ids &= memory_storage.get_mark_ids() + elif ('loc' in param) or ('commits' in param): + record_ids &= memory_storage.get_commit_ids() + kwargs['records'] = memory_storage.get_records(record_ids) return f(*args, **kwargs) @@ -258,7 +270,7 @@ def aggregate_filter(): metric_param = (flask.request.args.get('metric') or get_default('metric')) metric = metric_param.lower() - if metric == 'commits': + if metric in ['commits', 'reviews', 'marks']: metric_filter = lambda r: 1 elif metric == 'loc': metric_filter = lambda r: r['loc'] @@ -288,7 +300,7 @@ def exception_handler(): return decorator -def templated(template=None): +def templated(template=None, return_code=200): def decorator(f): @functools.wraps(f) def templated_decorated_function(*args, **kwargs): @@ -326,7 +338,7 @@ def templated(template=None): ctx['project_type_options'] = get_project_type_options() - return flask.render_template(template_name, **ctx) + return flask.render_template(template_name, **ctx), return_code return templated_decorated_function @@ -342,8 +354,9 @@ def overview(): @app.errorhandler(404) +@templated('404.html', 404) def page_not_found(e): - return flask.render_template('404.html'), 404 + pass def contribution_details(records, limit=DEFAULT_RECORDS_LIMIT): @@ -351,32 +364,37 @@ def contribution_details(records, limit=DEFAULT_RECORDS_LIMIT): bugs_map = {} companies_map = {} commits = [] + marks = dict((m, 0) for m in [-2, -1, 0, 1, 2]) loc = 0 for record in records: - loc += record['loc'] - commits.append(record) - blueprint = record['blueprint_id'] - if blueprint: - if blueprint in blueprints_map: - blueprints_map[blueprint].append(record) - else: - blueprints_map[blueprint] = [record] + if record['record_type'] == 'commit': + loc += record['loc'] + commits.append(record) + blueprint = record['blueprint_id'] + if blueprint: + if blueprint in blueprints_map: + blueprints_map[blueprint].append(record) + else: + blueprints_map[blueprint] = [record] - bug = record['bug_id'] - if bug: - if bug in bugs_map: - bugs_map[bug].append(record) - else: - bugs_map[bug] = [record] + bug = record['bug_id'] + if bug: + if bug in bugs_map: + bugs_map[bug].append(record) + else: + bugs_map[bug] = [record] - company = record['company_name'] - if company: - if company in companies_map: - companies_map[company]['loc'] += record['loc'] - companies_map[company]['commits'] += 1 - else: - companies_map[company] = {'loc': record['loc'], 'commits': 1} + company = record['company_name'] + if company: + if company in companies_map: + companies_map[company]['loc'] += record['loc'] + companies_map[company]['commits'] += 1 + else: + companies_map[company] = {'loc': record['loc'], + 'commits': 1} + elif record['record_type'] == 'mark': + marks[int(record['value'])] += 1 blueprints = sorted([{'id': key, 'module': value[0]['module'], @@ -395,6 +413,7 @@ def contribution_details(records, limit=DEFAULT_RECORDS_LIMIT): 'commit_count': len(commits), 'companies': companies_map, 'loc': loc, + 'marks': marks, } return result @@ -423,7 +442,7 @@ def module_details(module, records): @app.route('/engineers/') @exception_handler() @templated() -@record_filter() +@record_filter(ignore='metric') def engineer_details(launchpad_id, records): persistent_storage = get_vault()['persistent_storage'] user = list(persistent_storage.get_users(launchpad_id=launchpad_id))[0] diff --git a/etc/corrections.json b/etc/corrections.json index b05476962..c1bea4367 100644 --- a/etc/corrections.json +++ b/etc/corrections.json @@ -1,10 +1,9 @@ { "corrections": [ { - "commit_id": "ee3fe4e836ca1c81e50a8324a9b5f982de4fa97f", + "primary_key": "ee3fe4e836ca1c81e50a8324a9b5f982de4fa97f", "correction_comment": "Reset LOC to 0", - "lines_added": 0, - "lines_deleted": 0 + "loc": 0 } ] } \ No newline at end of file diff --git a/etc/default_data.json b/etc/default_data.json index 2fa463a83..3cdc43ea5 100644 --- a/etc/default_data.json +++ b/etc/default_data.json @@ -14378,11 +14378,6 @@ ], "releases": [ - { - "release_name": "ALL", - "start_date": "2010-May-01", - "end_date": "now" - }, { "release_name": "Essex", "start_date": "2011-Oct-01", diff --git a/etc/stackalytics.conf b/etc/stackalytics.conf index fc08afb8b..be420a85b 100644 --- a/etc/stackalytics.conf +++ b/etc/stackalytics.conf @@ -3,7 +3,7 @@ # debug = False # Default data -# default-data = /etc/stackalytics/default_data.json +# default_data = /etc/stackalytics/default_data.json # The folder that holds all project sources to analyze # sources_root = /var/local/stackalytics @@ -24,4 +24,13 @@ # listen_port = 8080 # The address of file with corrections data -# corrections-uri = https://raw.github.com/stackforge/stackalytics/master/etc/corrections.json +# corrections_uri = https://raw.github.com/stackforge/stackalytics/master/etc/corrections.json + +# URI of review system +# review_uri = gerrit://review.openstack.org + +# SSH key for gerrit review system access +# ssh_key_filename = /home/user/.ssh/id_rsa + +# SSH username for gerrit review system access +# ssh_username = user diff --git a/etc/test_default_data.json b/etc/test_default_data.json index cdbe66b22..6a51a4d1b 100644 --- a/etc/test_default_data.json +++ b/etc/test_default_data.json @@ -39,51 +39,13 @@ "repos": [ { "branches": ["master"], - "module": "python-quantumclient", - "type": "core", - "uri": "git://github.com/openstack/python-quantumclient.git", + "module": "stackalytics", + "project_type": "stackforge", + "uri": "git://github.com/stackforge/stackalytics.git", "releases": [ - { - "release_name": "Folsom", - "tag_from": "folsom-1", - "tag_to": "2.1" - }, - { - "release_name": "Grizzly", - "tag_from": "2.1", - "tag_to": "2.2.1" - }, { "release_name": "Havana", - "tag_from": "2.2.1", - "tag_to": "HEAD" - } - ] - }, - { - "branches": ["master"], - "module": "keystone", - "type": "core", - "uri": "git://github.com/openstack/keystone.git", - "releases": [ - { - "release_name": "Essex", - "tag_from": "2011.3", - "tag_to": "2012.1" - }, - { - "release_name": "Folsom", - "tag_from": "2012.1", - "tag_to": "2012.2" - }, - { - "release_name": "Grizzly", - "tag_from": "2012.2", - "tag_to": "2013.1" - }, - { - "release_name": "Havana", - "tag_from": "2013.1", + "tag_from": "5a1376ca", "tag_to": "HEAD" } ] @@ -91,11 +53,6 @@ ], "releases": [ - { - "release_name": "ALL", - "start_date": "2010-May-01", - "end_date": "now" - }, { "release_name": "Essex", "start_date": "2011-Oct-01", diff --git a/requirements.txt b/requirements.txt index b89eb065b..03c4c2247 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,8 @@ Flask>=0.9 Flask-Gravatar iso8601 launchpadlib -http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a2.tar.gz#egg=oslo.config-1.2.0a2 +oslo.config +paramiko>=1.8.0 pbr>=0.5.16,<0.6 psutil python-memcached diff --git a/stackalytics/processor/commit_processor.py b/stackalytics/processor/commit_processor.py deleted file mode 100644 index d1fd4e6f4..000000000 --- a/stackalytics/processor/commit_processor.py +++ /dev/null @@ -1,182 +0,0 @@ -# Copyright (c) 2013 Mirantis Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import re - -from launchpadlib import launchpad -from oslo.config import cfg - -LOG = logging.getLogger(__name__) - - -COMMIT_PROCESSOR_DUMMY = 0 -COMMIT_PROCESSOR_CACHED = 1 - - -class CommitProcessor(object): - def __init__(self, persistent_storage): - self.persistent_storage = persistent_storage - - def process(self, commit_iterator): - pass - - -class DummyProcessor(CommitProcessor): - def __init__(self, persistent_storage): - super(DummyProcessor, self).__init__(persistent_storage) - - def process(self, commit_iterator): - return commit_iterator - - -class CachedProcessor(CommitProcessor): - def __init__(self, persistent_storage): - super(CachedProcessor, self).__init__(persistent_storage) - - companies = persistent_storage.get_companies() - self.domains_index = {} - for company in companies: - for domain in company['domains']: - self.domains_index[domain] = company['company_name'] - - users = persistent_storage.get_users() - self.users_index = {} - for user in users: - for email in user['emails']: - self.users_index[email] = user - - LOG.debug('Cached commit processor is instantiated') - - def _find_company(self, companies, date): - for r in companies: - if date < r['end_date']: - return r['company_name'] - return companies[-1]['company_name'] - - def _get_company_by_email(self, email): - name, at, domain = email.partition('@') - if domain: - parts = domain.split('.') - for i in range(len(parts), 1, -1): - m = '.'.join(parts[len(parts) - i:]) - if m in self.domains_index: - return self.domains_index[m] - return None - - def _unknown_user_email(self, email): - - lp_profile = None - if not re.match(r'[\w\d_\.-]+@([\w\d_\.-]+\.)+[\w]+', email): - LOG.debug('User email is not valid %s' % email) - else: - LOG.debug('Lookup user email %s at Launchpad' % email) - lp = launchpad.Launchpad.login_anonymously('stackalytics') - try: - lp_profile = lp.people.getByEmail(email=email) - except Exception as error: - LOG.warn('Lookup of email %s failed %s' % - (email, error.message)) - if not lp_profile: - # user is not found in Launchpad, create dummy record for commit - # update - LOG.debug('Email is not found at Launchpad, mapping to nobody') - user = { - 'launchpad_id': None, - 'companies': [{ - 'company_name': self.domains_index[''], - 'end_date': 0 - }] - } - else: - # get user's launchpad id from his profile - launchpad_id = lp_profile.name - LOG.debug('Found user %s' % launchpad_id) - - # check if user with launchpad_id exists in persistent storage - persistent_user_iterator = self.persistent_storage.get_users( - launchpad_id=launchpad_id) - - for persistent_user in persistent_user_iterator: - break - else: - persistent_user = None - - if persistent_user: - # user already exist, merge - LOG.debug('User exists in persistent storage, add new email') - persistent_user_email = persistent_user['emails'][0] - if persistent_user_email not in self.users_index: - raise Exception('User index is not valid') - user = self.users_index[persistent_user_email] - user['emails'].append(email) - self.persistent_storage.update_user(user) - else: - # add new user - LOG.debug('Add new user into persistent storage') - company = (self._get_company_by_email(email) or - self.domains_index['']) - user = { - 'launchpad_id': launchpad_id, - 'user_name': lp_profile.display_name, - 'emails': [email], - 'companies': [{ - 'company_name': company, - 'end_date': 0, - }], - } - self.persistent_storage.insert_user(user) - - # update local index - self.users_index[email] = user - return user - - def _update_commit_with_user_data(self, commit): - email = commit['author_email'].lower() - if email in self.users_index: - user = self.users_index[email] - else: - user = self._unknown_user_email(email) - commit['launchpad_id'] = user['launchpad_id'] - company = self._get_company_by_email(email) - if not company: - company = self._find_company(user['companies'], commit['date']) - commit['company_name'] = company - if 'user_name' in user: - commit['author_name'] = user['user_name'] - - def process(self, commit_iterator): - - for commit in commit_iterator: - self._update_commit_with_user_data(commit) - - if cfg.CONF.filter_robots and commit['company_name'] == '*robots': - continue - - yield commit - - -class CommitProcessorFactory(object): - @staticmethod - def get_processor(commit_processor_type, persistent_storage): - LOG.debug('Factory is asked for commit processor type %s' % - commit_processor_type) - if commit_processor_type == COMMIT_PROCESSOR_DUMMY: - return DummyProcessor(persistent_storage) - elif commit_processor_type == COMMIT_PROCESSOR_CACHED: - return CachedProcessor(persistent_storage) - else: - raise Exception('Unknown commit processor type %s' % - commit_processor_type) diff --git a/stackalytics/processor/config.py b/stackalytics/processor/config.py index 6517ee0bb..7ad3bbf77 100644 --- a/stackalytics/processor/config.py +++ b/stackalytics/processor/config.py @@ -40,4 +40,10 @@ OPTS = [ default=('https://raw.github.com/stackforge/stackalytics/' 'master/etc/corrections.json'), help='The address of file with corrections data'), + cfg.StrOpt('review-uri', default='gerrit://review.openstack.org', + help='URI of review system'), + cfg.StrOpt('ssh-key-filename', default='/home/ishakhat/.ssh/4launchpad_id', + help='SSH key for gerrit review system access'), + cfg.StrOpt('ssh-username', default='ishakhat', + help='SSH username for gerrit review system access'), ] diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 8b6329d4f..911ce7694 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -12,17 +12,20 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. + import json +import urllib +import urllib2 from oslo.config import cfg import psutil from psutil import _error -import urllib2 from stackalytics.openstack.common import log as logging -from stackalytics.processor import commit_processor from stackalytics.processor import config from stackalytics.processor import persistent_storage +from stackalytics.processor import rcs +from stackalytics.processor import record_processor from stackalytics.processor import runtime_storage from stackalytics.processor import vcs @@ -57,41 +60,74 @@ def update_pids(runtime_storage): runtime_storage.active_pids(pids) -def process_repo(repo, runtime_storage, processor): +def _merge_commits(original, new): + if new['branches'] < original['branches']: + return False + else: + original['branches'] |= new['branches'] + return True + + +def process_repo(repo, runtime_storage, commit_processor, review_processor): uri = repo['uri'] LOG.debug('Processing repo uri %s' % uri) vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root) vcs_inst.fetch() + rcs_inst = rcs.get_rcs(repo, cfg.CONF.review_uri) + rcs_inst.setup(key_filename=cfg.CONF.ssh_key_filename, + username=cfg.CONF.ssh_username) + for branch in repo['branches']: - LOG.debug('Processing repo %s, branch %s' % (uri, branch)) + LOG.debug('Processing repo %s, branch %s', uri, branch) - head_commit_id = runtime_storage.get_head_commit_id(uri, branch) + vcs_key = 'vcs:' + str(urllib.quote_plus(uri) + ':' + branch) + last_id = runtime_storage.get_last_id(vcs_key) - commit_iterator = vcs_inst.log(branch, head_commit_id) - processed_commit_iterator = processor.process(commit_iterator) - runtime_storage.set_records(processed_commit_iterator) + commit_iterator = vcs_inst.log(branch, last_id) + processed_commit_iterator = commit_processor.process(commit_iterator) + runtime_storage.set_records(processed_commit_iterator, _merge_commits) - head_commit_id = vcs_inst.get_head_commit_id(branch) - runtime_storage.set_head_commit_id(uri, branch, head_commit_id) + last_id = vcs_inst.get_last_id(branch) + runtime_storage.set_last_id(vcs_key, last_id) + + LOG.debug('Processing reviews for repo %s, branch %s', uri, branch) + + rcs_key = 'rcs:' + str(urllib.quote_plus(uri) + ':' + branch) + last_id = runtime_storage.get_last_id(rcs_key) + + reviews_iterator = rcs_inst.log(branch, last_id) + processed_review_iterator = review_processor.process(reviews_iterator) + runtime_storage.set_records(processed_review_iterator) + + last_id = rcs_inst.get_last_id(branch) + runtime_storage.set_last_id(rcs_key, last_id) def update_repos(runtime_storage, persistent_storage): repos = persistent_storage.get_repos() - processor = commit_processor.CommitProcessorFactory.get_processor( - commit_processor.COMMIT_PROCESSOR_CACHED, - persistent_storage) + commit_processor = record_processor.get_record_processor( + record_processor.COMMIT_PROCESSOR, persistent_storage) + review_processor = record_processor.get_record_processor( + record_processor.REVIEW_PROCESSOR, persistent_storage) for repo in repos: - process_repo(repo, runtime_storage, processor) + process_repo(repo, runtime_storage, commit_processor, review_processor) def apply_corrections(uri, runtime_storage_inst): corrections_fd = urllib2.urlopen(uri) raw = corrections_fd.read() corrections_fd.close() - runtime_storage_inst.apply_corrections(json.loads(raw)['corrections']) + corrections = json.loads(raw)['corrections'] + valid_corrections = [] + for c in corrections: + if 'primary_key' in c: + valid_corrections.append(c) + else: + LOG.warn('Correction misses primary key: %s', c) + runtime_storage_inst.apply_corrections(corrections) def main(): diff --git a/stackalytics/processor/rcs.py b/stackalytics/processor/rcs.py new file mode 100644 index 000000000..f82b3cc0f --- /dev/null +++ b/stackalytics/processor/rcs.py @@ -0,0 +1,124 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re + +import paramiko + +from stackalytics.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + +DEFAULT_PORT = 29418 +GERRIT_URI_PREFIX = r'^gerrit:\/\/' + + +class Rcs(object): + def __init__(self, repo, uri): + self.repo = repo + + def setup(self, **kwargs): + pass + + def log(self, branch, last_id): + pass + + def get_last_id(self, branch): + pass + + +class Gerrit(Rcs): + def __init__(self, repo, uri): + super(Gerrit, self).__init__(repo, uri) + + stripped = re.sub(GERRIT_URI_PREFIX, '', uri) + if stripped: + self.hostname, semicolon, self.port = stripped.partition(':') + if not self.port: + self.port = DEFAULT_PORT + else: + raise Exception('Invalid rcs uri %s' % uri) + + self.client = paramiko.SSHClient() + self.client.load_system_host_keys() + self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + def setup(self, **kwargs): + if 'key_filename' in kwargs: + self.key_filename = kwargs['key_filename'] + else: + self.key_filename = None + + if 'username' in kwargs: + self.username = kwargs['username'] + else: + self.username = None + + def _connect(self): + self.client.connect(self.hostname, port=self.port, + key_filename=self.key_filename, + username=self.username) + LOG.debug('Successfully connected to Gerrit') + + def log(self, branch, last_id): + module = self.repo['module'] + LOG.debug('Retrieve reviews from gerrit for project %s', module) + + self._connect() + + cmd = ('gerrit query --all-approvals --patch-sets --format JSON ' + '%(module)s ' + 'branch:%(branch)s' % + {'module': module, 'branch': branch}) + + if last_id: + cmd += ' NOT resume_sortkey:%016x' % (last_id + 1) + + stdin, stdout, stderr = self.client.exec_command(cmd) + for line in stdout: + review = json.loads(line) + + if 'sortKey' in review: + review['module'] = module + yield review + + def get_last_id(self, branch): + module = self.repo['module'] + LOG.debug('Get last id for module %s', module) + + self._connect() + + cmd = ('gerrit query --all-approvals --patch-sets --format JSON ' + '%(module)s branch:%(branch)s limit:1' % + {'module': module, 'branch': branch}) + + stdin, stdout, stderr = self.client.exec_command(cmd) + for line in stdout: + review = json.loads(line) + if 'sortKey' in review: + return int(review['sortKey'], 16) + + raise Exception('Last id is not found for module %s' % module) + + +def get_rcs(repo, uri): + LOG.debug('Review control system is requested for uri %s' % uri) + match = re.search(GERRIT_URI_PREFIX, uri) + if match: + return Gerrit(repo, uri) + else: + raise Exception('Unknown review control system for uri %s' % uri) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py new file mode 100644 index 000000000..5a7133cb0 --- /dev/null +++ b/stackalytics/processor/record_processor.py @@ -0,0 +1,278 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import bisect + +import logging +import re + +from launchpadlib import launchpad +from oslo.config import cfg +from stackalytics.processor import user_utils + +LOG = logging.getLogger(__name__) + +COMMIT_PROCESSOR = 1 +REVIEW_PROCESSOR = 2 + + +class RecordProcessor(object): + def __init__(self, persistent_storage): + self.persistent_storage = persistent_storage + + def process(self, record_iterator): + pass + + +class CachedProcessor(RecordProcessor): + def __init__(self, persistent_storage): + super(CachedProcessor, self).__init__(persistent_storage) + + companies = persistent_storage.get_companies() + self.domains_index = {} + for company in companies: + for domain in company['domains']: + self.domains_index[domain] = company['company_name'] + + users = persistent_storage.get_users() + self.users_index = {} + for user in users: + for email in user['emails']: + self.users_index[email] = user + + def _find_company(self, companies, date): + for r in companies: + if date < r['end_date']: + return r['company_name'] + return companies[-1]['company_name'] + + def _get_company_by_email(self, email): + name, at, domain = email.partition('@') + if domain: + parts = domain.split('.') + for i in range(len(parts), 1, -1): + m = '.'.join(parts[len(parts) - i:]) + if m in self.domains_index: + return self.domains_index[m] + return None + + def _persist_user(self, launchpad_id, email, user_name): + # check if user with launchpad_id exists in persistent storage + persistent_user_iterator = self.persistent_storage.get_users( + launchpad_id=launchpad_id) + for persistent_user in persistent_user_iterator: + break + else: + persistent_user = None + if persistent_user: + # user already exist, merge + LOG.debug('User exists in persistent storage, add new email %s', + email) + persistent_user_email = persistent_user['emails'][0] + if persistent_user_email not in self.users_index: + raise Exception('User index is not valid') + user = self.users_index[persistent_user_email] + user['emails'].append(email) + self.persistent_storage.update_user(user) + else: + # add new user + LOG.debug('Add new user into persistent storage') + company = (self._get_company_by_email(email) or + self.domains_index['']) + user = { + 'launchpad_id': launchpad_id, + 'user_name': user_name, + 'emails': [email], + 'companies': [{ + 'company_name': company, + 'end_date': 0, + }], + } + self.persistent_storage.insert_user(user) + + return user + + def _unknown_user_email(self, email): + + lp_profile = None + if not re.match(r'[\w\d_\.-]+@([\w\d_\.-]+\.)+[\w]+', email): + LOG.debug('User email is not valid %s' % email) + else: + LOG.debug('Lookup user email %s at Launchpad' % email) + lp = launchpad.Launchpad.login_anonymously('stackalytics') + try: + lp_profile = lp.people.getByEmail(email=email) + except Exception as error: + LOG.warn('Lookup of email %s failed %s' % + (email, error.message)) + if not lp_profile: + # user is not found in Launchpad, create dummy record for commit + # update + LOG.debug('Email is not found at Launchpad, mapping to nobody') + user = { + 'launchpad_id': None, + 'companies': [{ + 'company_name': self.domains_index[''], + 'end_date': 0 + }] + } + else: + # get user's launchpad id from his profile + launchpad_id = lp_profile.name + user_name = lp_profile.display_name + LOG.debug('Found user %s' % launchpad_id) + + user = self._persist_user(launchpad_id, email, user_name) + + # update local index + self.users_index[email] = user + return user + + +class CommitProcessor(CachedProcessor): + def __init__(self, persistent_storage): + super(CommitProcessor, self).__init__(persistent_storage) + LOG.debug('Commit processor is instantiated') + + def _update_commit_with_user_data(self, commit): + email = commit['author_email'].lower() + if email in self.users_index: + user = self.users_index[email] + else: + user = self._unknown_user_email(email) + commit['launchpad_id'] = user['launchpad_id'] + company = self._get_company_by_email(email) + if not company: + company = self._find_company(user['companies'], commit['date']) + commit['company_name'] = company + if 'user_name' in user: + commit['author_name'] = user['user_name'] + + def process(self, record_iterator): + for record in record_iterator: + self._update_commit_with_user_data(record) + + if cfg.CONF.filter_robots and record['company_name'] == '*robots': + continue + + record['record_type'] = 'commit' + record['primary_key'] = record['commit_id'] + record['week'] = user_utils.timestamp_to_week(record['date']) + record['loc'] = record['lines_added'] + record['lines_deleted'] + + yield record + + +class ReviewProcessor(CachedProcessor): + def __init__(self, persistent_storage): + super(ReviewProcessor, self).__init__(persistent_storage) + + self.launchpad_to_company_index = {} + users = persistent_storage.get_users() + for user in users: + self.launchpad_to_company_index[user['launchpad_id']] = user + + self.releases = [] + for release in persistent_storage.get_releases(): + r = release.copy() + r['end_date_ts'] = user_utils.date_to_timestamp(r['end_date']) + r['release_name'] = r['release_name'].lower() + self.releases.append(r) + self.releases.sort(key=lambda x: x['end_date_ts']) + self.releases_dates = [r['end_date_ts'] for r in self.releases] + + LOG.debug('Review processor is instantiated') + + def _get_release(self, timestamp): + release_index = bisect.bisect(self.releases_dates, timestamp) + return self.releases[release_index]['release_name'] + + def _process_user(self, email, launchpad_id, user_name, date): + if email in self.users_index: + user = self.users_index[email] + else: + user = self._persist_user(launchpad_id, email, user_name) + self.users_index[email] = user + + company = self._get_company_by_email(email) + if not company: + company = self._find_company(user['companies'], date) + return company + + def _spawn_review(self, record): + # copy everything except pathsets and flatten user data + review = dict([(k, v) for k, v in record.iteritems() + if k not in ['patchSets', 'owner']]) + owner = record['owner'] + company = self._process_user(owner['email'].lower(), + owner['username'], + owner['name'], + record['createdOn']) + review['record_type'] = 'review' + review['primary_key'] = record['id'] + review['company_name'] = company + review['launchpad_id'] = owner['username'] + review['release'] = self._get_release(review['createdOn']) + yield review + + def _spawn_marks(self, record): + review_id = record['id'] + for patch in record['patchSets']: + if 'approvals' not in patch: + continue # not reviewed by anyone + for approval in patch['approvals']: + # copy everything and flatten user data + mark = dict([(k, v) for k, v in approval.iteritems() + if k != 'by']) + reviewer = approval['by'] + mark['record_type'] = 'mark' + mark['primary_key'] = (record['id'] + + str(mark['grantedOn']) + + mark['type']) + mark['launchpad_id'] = reviewer['username'] + mark['module'] = record['module'] + + if 'email' not in reviewer: + continue + + company = self._process_user(reviewer['email'], + reviewer['username'], + reviewer['name'], + mark['grantedOn']) + mark['company_name'] = company + mark['review_id'] = review_id + mark['release'] = self._get_release(mark['grantedOn']) + + yield mark + + def process(self, record_iterator): + """ + Process a review. Review spawns into records of two types: + * review - records that a user created review request + * mark - records that a user set approval mark to given review + """ + for record in record_iterator: + for gen in [self._spawn_review, self._spawn_marks]: + for r in gen(record): + yield r + + +def get_record_processor(processor_type, persistent_storage): + LOG.debug('Record processor is requested of type %s' % processor_type) + if processor_type == COMMIT_PROCESSOR: + return CommitProcessor(persistent_storage) + elif processor_type == REVIEW_PROCESSOR: + return ReviewProcessor(persistent_storage) + else: + raise Exception('Unknown commit processor type %s' % processor_type) diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index 164df2db0..95ab90703 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -16,7 +16,6 @@ import logging import re -import urllib import memcache @@ -25,6 +24,7 @@ LOG = logging.getLogger(__name__) BULK_READ_SIZE = 64 RECORD_ID_PREFIX = 'record:' UPDATE_ID_PREFIX = 'update:' +MEMCACHED_URI_PREFIX = r'^memcached:\/\/' class RuntimeStorage(object): @@ -37,10 +37,10 @@ class RuntimeStorage(object): def apply_corrections(self, corrections_iterator): pass - def get_head_commit_id(self, uri, branch): + def get_last_id(self, key): pass - def set_head_commit_id(self, uri, branch, head_commit_id): + def set_last_id(self, key, head_commit_id): pass def get_update(self, pid): @@ -55,7 +55,7 @@ class MemcachedStorage(RuntimeStorage): def __init__(self, uri): super(MemcachedStorage, self).__init__(uri) - stripped = re.sub(r'memcached:\/\/', '', uri) + stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri) if stripped: storage_uri = stripped.split(',') self.memcached = memcache.Client(storage_uri) @@ -63,15 +63,22 @@ class MemcachedStorage(RuntimeStorage): else: raise Exception('Invalid storage uri %s' % uri) - def set_records(self, records_iterator): + def _build_index(self): + self.record_index = {} + for record in self._get_all_records(): + self.record_index[record['primary_key']] = record['record_id'] + + def set_records(self, records_iterator, merge_handler=None): for record in records_iterator: - if record['commit_id'] in self.commit_id_index: + if record['primary_key'] in self.record_index: # update - record_id = self.commit_id_index[record['commit_id']] + record_id = self.record_index[record['primary_key']] original = self.memcached.get(self._get_record_name(record_id)) - original['branches'] |= record['branches'] - LOG.debug('Update record %s' % record) - self.memcached.set(self._get_record_name(record_id), original) + if merge_handler: + if merge_handler(original, record): + LOG.debug('Update record %s' % record) + self.memcached.set(self._get_record_name(record_id), + original) else: # insert record record_id = self._get_record_count() @@ -84,10 +91,10 @@ class MemcachedStorage(RuntimeStorage): def apply_corrections(self, corrections_iterator): for correction in corrections_iterator: - if correction['commit_id'] not in self.commit_id_index: + if correction['primary_key'] not in self.record_index: continue - record_id = self.commit_id_index[correction['commit_id']] + record_id = self.record_index[correction['primary_key']] original = self.memcached.get(self._get_record_name(record_id)) need_update = False @@ -100,12 +107,10 @@ class MemcachedStorage(RuntimeStorage): self.memcached.set(self._get_record_name(record_id), original) self._commit_update(record_id) - def get_head_commit_id(self, uri, branch): - key = str(urllib.quote_plus(uri) + ':' + branch) + def get_last_id(self, key): return self.memcached.get(key) - def set_head_commit_id(self, uri, branch, head_commit_id): - key = str(urllib.quote_plus(uri) + ':' + branch) + def set_last_id(self, key, head_commit_id): self.memcached.set(key, head_commit_id) def get_update(self, pid): @@ -186,15 +191,10 @@ class MemcachedStorage(RuntimeStorage): self.memcached.set(UPDATE_ID_PREFIX + str(count), record_id) self.memcached.set('update:count', count + 1) - def _build_index(self): - self.commit_id_index = {} - for record in self._get_all_records(): - self.commit_id_index[record['commit_id']] = record['record_id'] - def get_runtime_storage(uri): LOG.debug('Runtime storage is requested for uri %s' % uri) - match = re.search(r'^memcached:\/\/', uri) + match = re.search(MEMCACHED_URI_PREFIX, uri) if match: return MemcachedStorage(uri) else: diff --git a/stackalytics/processor/vcs.py b/stackalytics/processor/vcs.py index 946d221f7..10e9c35f7 100644 --- a/stackalytics/processor/vcs.py +++ b/stackalytics/processor/vcs.py @@ -13,13 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import re import sh +from stackalytics.openstack.common import log as logging + LOG = logging.getLogger(__name__) @@ -41,7 +42,7 @@ class Vcs(object): def log(self, branch, head_commit_id): pass - def get_head_commit_id(self, branch): + def get_last_id(self, branch): pass @@ -162,7 +163,7 @@ class Git(Vcs): yield commit - def get_head_commit_id(self, branch): + def get_last_id(self, branch): LOG.debug('Get head commit for repo uri %s' % self.repo['uri']) self._chdir() diff --git a/test-requirements.txt b/test-requirements.txt index d27f5fdf0..881c03712 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -2,7 +2,7 @@ pep8==1.4.5 pyflakes==0.7.2 flake8==2.0 -hacking>=0.5.3,<0.6 +hacking>=0.5.3,<0.7 coverage discover diff --git a/tests/unit/test_commit_processor.py b/tests/unit/test_commit_processor.py index 112e143e9..172780be6 100644 --- a/tests/unit/test_commit_processor.py +++ b/tests/unit/test_commit_processor.py @@ -1,10 +1,25 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from launchpadlib import launchpad import mock from oslo.config import cfg import testtools -from stackalytics.processor import commit_processor from stackalytics.processor import persistent_storage +from stackalytics.processor import record_processor class TestCommitProcessor(testtools.TestCase): @@ -41,7 +56,7 @@ class TestCommitProcessor(testtools.TestCase): ]) self.persistent_storage = p_storage - self.commit_processor = commit_processor.CachedProcessor(p_storage) + self.commit_processor = record_processor.CommitProcessor(p_storage) self.launchpad_patch = mock.patch('launchpadlib.launchpad.Launchpad') self.launchpad_patch.start() cfg.CONF = mock.MagicMock() diff --git a/tests/unit/test_vcs.py b/tests/unit/test_vcs.py index 0894f3310..c0435d825 100644 --- a/tests/unit/test_vcs.py +++ b/tests/unit/test_vcs.py @@ -1,3 +1,18 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import mock import os import testtools diff --git a/tests/unit/test_web_utils.py b/tests/unit/test_web_utils.py index bda37ba9c..0280ba52e 100644 --- a/tests/unit/test_web_utils.py +++ b/tests/unit/test_web_utils.py @@ -1,3 +1,18 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import testtools from dashboard import web