diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 96cea417d..44fc681c8 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -17,6 +17,7 @@ from oslo_config import cfg from oslo_log import log as logging import psutil import six +import time from stackalytics.processor import bps from stackalytics.processor import config @@ -93,6 +94,7 @@ def _process_reviews(record_iterator, ci_map, module, branch): def _process_repo(repo, runtime_storage_inst, record_processor_inst, rcs_inst): uri = repo['uri'] + quoted_uri = six.moves.urllib.parse.quote_plus(uri) LOG.info('Processing repo uri: %s', uri) LOG.debug('Processing blueprints for repo uri: %s', uri) @@ -129,8 +131,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, for branch in branches: LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch) - vcs_key = 'vcs:' + str(six.moves.urllib.parse.quote_plus(uri) + - ':' + branch) + vcs_key = 'vcs:%s:%s' % (quoted_uri, branch) last_id = runtime_storage_inst.get_by_key(vcs_key) commit_iterator = vcs_inst.log(branch, last_id) @@ -145,11 +146,11 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch) - rcs_key = 'rcs:' + str(six.moves.urllib.parse.quote_plus(uri) + - ':' + branch) - last_id = runtime_storage_inst.get_by_key(rcs_key) + rcs_key = 'rcs:%s:%s' % (quoted_uri, branch) + last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key) + current_retrieval_time = int(time.time()) - review_iterator = rcs_inst.log(repo, branch, last_id, + review_iterator = rcs_inst.log(repo, branch, last_retrieval_time, grab_comments=('ci' in repo)) review_iterator_typed = _record_typer(review_iterator, 'review') @@ -162,8 +163,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, runtime_storage_inst.set_records(processed_review_iterator, utils.merge_records) - last_id = rcs_inst.get_last_id(repo, branch) - runtime_storage_inst.set_by_key(rcs_key, last_id) + runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): diff --git a/stackalytics/processor/rcs.py b/stackalytics/processor/rcs.py index 0104c8b12..82dbb44e1 100644 --- a/stackalytics/processor/rcs.py +++ b/stackalytics/processor/rcs.py @@ -18,7 +18,7 @@ import re from oslo_log import log as logging import paramiko - +import time LOG = logging.getLogger(__name__) @@ -38,12 +38,10 @@ class Rcs(object): def get_project_list(self): pass - def log(self, repo, branch, last_id): + def log(self, repo, branch, last_retrieval_time, status=None, + grab_comments=False): return [] - def get_last_id(self, repo, branch): - return -1 - def close(self): pass @@ -89,17 +87,15 @@ class Gerrit(Rcs): LOG.exception(e) return False - def _get_cmd(self, project_organization, module, branch, sort_key=None, - is_open=False, limit=PAGE_LIMIT, grab_comments=False): + def _get_cmd(self, project_organization, module, branch, age=0, + status=None, limit=PAGE_LIMIT, grab_comments=False): cmd = ('gerrit query --all-approvals --patch-sets --format JSON ' 'project:\'%(ogn)s/%(module)s\' branch:%(branch)s ' - 'limit:%(limit)s' % + 'limit:%(limit)s age:%(age)ss' % {'ogn': project_organization, 'module': module, - 'branch': branch, 'limit': limit}) - if is_open: - cmd += ' is:open' - if sort_key: - cmd += ' resume_sortkey:%016x' % sort_key + 'branch': branch, 'limit': limit, 'age': age}) + if status: + cmd += ' status:%s' % status if grab_comments: cmd += ' --comments' return cmd @@ -123,37 +119,47 @@ class Gerrit(Rcs): return False def _poll_reviews(self, project_organization, module, branch, - start_id=0, last_id=0, is_open=False, - grab_comments=False): - sort_key = start_id - last_id = last_id or 0 + last_retrieval_time, status=None, grab_comments=False): + age = 0 + proceed = True - while True: - cmd = self._get_cmd(project_organization, module, branch, sort_key, - is_open, grab_comments=grab_comments) + # the algorithm retrieves reviews by age; the next page is started + # with the time of the oldest; it is possible that the oldest + # will be included in consequent result (as the age offsets to local + # machine timestamp, but evaluated remotely), so we need to track all + # ids and ignore those we've already seen + processed = set() + + while proceed: + cmd = self._get_cmd(project_organization, module, branch, + age=age, status=status, + grab_comments=grab_comments) LOG.debug('Executing command: %s', cmd) exec_result = self._exec_command(cmd) if not exec_result: break stdin, stdout, stderr = exec_result - proceed = False + proceed = False # assume there are no more reviews available for line in stdout: review = json.loads(line) - if 'sortKey' in review: - sort_key = int(review['sortKey'], 16) - if sort_key <= last_id: + if 'number' in review: # choose reviews not summary + + if review['number'] in processed: + continue # already seen that + + last_updated = int(review['lastUpdated']) + if last_updated < last_retrieval_time: # too old proceed = False break - proceed = True + proceed = True # have at least one review, can dig deeper + age = max(age, int(time.time()) - last_updated) + processed.add(review['number']) review['module'] = module yield review - if not proceed: - break - def get_project_list(self): exec_result = self._exec_command('gerrit ls-projects') if not exec_result: @@ -163,48 +169,16 @@ class Gerrit(Rcs): return result - def log(self, repo, branch, last_id, grab_comments=False): - # poll new reviews from the top down to last_id - LOG.debug('Poll new reviews for module: %s', repo['module']) - for review in self._poll_reviews(repo['organization'], - repo['module'], branch, - last_id=last_id, - grab_comments=grab_comments): + def log(self, repo, branch, last_retrieval_time, status=None, + grab_comments=False): + # poll reviews down from top between last_r_t and current_r_t + LOG.debug('Poll reviews for module: %s', repo['module']) + for review in self._poll_reviews( + repo['organization'], repo['module'], branch, + last_retrieval_time, status=status, + grab_comments=grab_comments): yield review - # poll open reviews from last_id down to bottom - LOG.debug('Poll open reviews for module: %s', repo['module']) - start_id = None - if last_id: - start_id = last_id + 1 # include the last review into query - for review in self._poll_reviews(repo['organization'], - repo['module'], branch, - start_id=start_id, is_open=True, - grab_comments=grab_comments): - yield review - - def get_last_id(self, repo, branch): - LOG.debug('Get last id for module: %s', repo['module']) - - cmd = self._get_cmd(repo['organization'], repo['module'], - branch, limit=1) - LOG.debug('Executing command: %s', cmd) - exec_result = self._exec_command(cmd) - if not exec_result: - return None - stdin, stdout, stderr = exec_result - - last_id = None - for line in stdout: - review = json.loads(line) - if 'sortKey' in review: - last_id = int(review['sortKey'], 16) - break - - LOG.debug('Module %(module)s last id is %(id)s', - {'module': repo['module'], 'id': last_id}) - return last_id - def close(self): self.client.close() diff --git a/stackalytics/tests/unit/test_rcs.py b/stackalytics/tests/unit/test_rcs.py new file mode 100644 index 000000000..ff7279a23 --- /dev/null +++ b/stackalytics/tests/unit/test_rcs.py @@ -0,0 +1,127 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import mock +import testtools + +from stackalytics.processor import rcs + +REVIEW_ONE = json.dumps( + {"project": "openstack/nova", "branch": "master", "topic": "bug/1494374", + "id": "Id741dfc769c02a5544691a7db49a7dbff6b11376", "number": "229382", + "subject": "method is nearly 400 LOC and should be broken up", + "createdOn": 1443613948, "lastUpdated": 1444222222, + "sortKey": "0038481b00038006", "open": True, "status": "NEW"}) +REVIEW_END_LINE = json.dumps( + {"type": "stats", "rowCount": 2, "runTimeMilliseconds": 13}) + + +class TestRcs(testtools.TestCase): + + @mock.patch('paramiko.SSHClient') + def test_setup(self, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_connect = mock.Mock() + mock_client.connect = mock_connect + + gerrit = rcs.Gerrit('gerrit://review.openstack.org') + setup_result = gerrit.setup(username='user', key_filename='key') + + self.assertEqual(True, setup_result) + mock_connect.assert_called_once_with( + 'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key', + username='user') + + @mock.patch('paramiko.SSHClient') + def test_setup_error(self, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_connect = mock.Mock() + mock_client.connect = mock_connect + mock_connect.side_effect = Exception + + gerrit = rcs.Gerrit('gerrit://review.openstack.org') + setup_result = gerrit.setup(username='user', key_filename='key') + + self.assertEqual(False, setup_result) + mock_connect.assert_called_once_with( + 'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key', + username='user') + + @mock.patch('paramiko.SSHClient') + @mock.patch('time.time') + def test_log(self, mock_time, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_exec = mock.Mock() + mock_client.exec_command = mock_exec + mock_exec.side_effect = [ + ('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary + ('', [REVIEW_END_LINE], ''), # only summary = no more reviews + ] + + gerrit = rcs.Gerrit('uri') + + repo = dict(organization='openstack', module='nova') + branch = 'master' + last_retrieval_time = 1444000000 + mock_time.return_value = 1444333333 + records = list(gerrit.log(repo, branch, last_retrieval_time)) + + self.assertEqual(1, len(records)) + self.assertEqual('229382', records[0]['number']) + + mock_client.exec_command.assert_has_calls([ + mock.call('gerrit query --all-approvals --patch-sets ' + '--format JSON project:\'openstack/nova\' branch:master ' + 'limit:100 age:0s'), + mock.call('gerrit query --all-approvals --patch-sets ' + '--format JSON project:\'openstack/nova\' branch:master ' + 'limit:100 age:111111s'), + ]) + + @mock.patch('paramiko.SSHClient') + def test_log_old_reviews(self, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_exec = mock.Mock() + mock_client.exec_command = mock_exec + mock_exec.side_effect = [ + ('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary + ('', [REVIEW_END_LINE], ''), # only summary = no more reviews + ] + + gerrit = rcs.Gerrit('uri') + + repo = dict(organization='openstack', module='nova') + branch = 'master' + last_retrieval_time = 1445000000 + records = list(gerrit.log(repo, branch, last_retrieval_time, + status='merged', grab_comments=True)) + + self.assertEqual(0, len(records)) + + mock_client.exec_command.assert_has_calls([ + mock.call('gerrit query --all-approvals --patch-sets ' + '--format JSON project:\'openstack/nova\' branch:master ' + 'limit:100 age:0s status:merged --comments'), + ])