From 491219078322a898f4410c4467c3079240d6b59d Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Tue, 6 Oct 2015 17:43:14 +0300 Subject: [PATCH] Make review processing compatible with Gerrit 2.9+ Review processor uses resume_sortkey query parameter for paging and to store the last processed change request. Staring with Gerrit 2.9 this parameter is removed, also in that version there's no way to do paging. Staring 2.10 Gerrit supports paging with --start param. The code is made to be compatible with current Gerrit (2.8.x) and modern versions. The only compatible way is to use 'age' query param for paging. NOTE: this patch introduces incompatible changes into data stored in runtime storage. Full re-load is required. Change-Id: I315dbfdea35f3980dd088be46dd12cb22660ed82 Closes-Bug: #1503267 --- stackalytics/processor/main.py | 16 ++-- stackalytics/processor/rcs.py | 110 +++++++++--------------- stackalytics/tests/unit/test_rcs.py | 127 ++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 76 deletions(-) create mode 100644 stackalytics/tests/unit/test_rcs.py diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 96cea417d..44fc681c8 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -17,6 +17,7 @@ from oslo_config import cfg from oslo_log import log as logging import psutil import six +import time from stackalytics.processor import bps from stackalytics.processor import config @@ -93,6 +94,7 @@ def _process_reviews(record_iterator, ci_map, module, branch): def _process_repo(repo, runtime_storage_inst, record_processor_inst, rcs_inst): uri = repo['uri'] + quoted_uri = six.moves.urllib.parse.quote_plus(uri) LOG.info('Processing repo uri: %s', uri) LOG.debug('Processing blueprints for repo uri: %s', uri) @@ -129,8 +131,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, for branch in branches: LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch) - vcs_key = 'vcs:' + str(six.moves.urllib.parse.quote_plus(uri) + - ':' + branch) + vcs_key = 'vcs:%s:%s' % (quoted_uri, branch) last_id = runtime_storage_inst.get_by_key(vcs_key) commit_iterator = vcs_inst.log(branch, last_id) @@ -145,11 +146,11 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch) - rcs_key = 'rcs:' + str(six.moves.urllib.parse.quote_plus(uri) + - ':' + branch) - last_id = runtime_storage_inst.get_by_key(rcs_key) + rcs_key = 'rcs:%s:%s' % (quoted_uri, branch) + last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key) + current_retrieval_time = int(time.time()) - review_iterator = rcs_inst.log(repo, branch, last_id, + review_iterator = rcs_inst.log(repo, branch, last_retrieval_time, grab_comments=('ci' in repo)) review_iterator_typed = _record_typer(review_iterator, 'review') @@ -162,8 +163,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, runtime_storage_inst.set_records(processed_review_iterator, utils.merge_records) - last_id = rcs_inst.get_last_id(repo, branch) - runtime_storage_inst.set_by_key(rcs_key, last_id) + runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): diff --git a/stackalytics/processor/rcs.py b/stackalytics/processor/rcs.py index 0104c8b12..82dbb44e1 100644 --- a/stackalytics/processor/rcs.py +++ b/stackalytics/processor/rcs.py @@ -18,7 +18,7 @@ import re from oslo_log import log as logging import paramiko - +import time LOG = logging.getLogger(__name__) @@ -38,12 +38,10 @@ class Rcs(object): def get_project_list(self): pass - def log(self, repo, branch, last_id): + def log(self, repo, branch, last_retrieval_time, status=None, + grab_comments=False): return [] - def get_last_id(self, repo, branch): - return -1 - def close(self): pass @@ -89,17 +87,15 @@ class Gerrit(Rcs): LOG.exception(e) return False - def _get_cmd(self, project_organization, module, branch, sort_key=None, - is_open=False, limit=PAGE_LIMIT, grab_comments=False): + def _get_cmd(self, project_organization, module, branch, age=0, + status=None, limit=PAGE_LIMIT, grab_comments=False): cmd = ('gerrit query --all-approvals --patch-sets --format JSON ' 'project:\'%(ogn)s/%(module)s\' branch:%(branch)s ' - 'limit:%(limit)s' % + 'limit:%(limit)s age:%(age)ss' % {'ogn': project_organization, 'module': module, - 'branch': branch, 'limit': limit}) - if is_open: - cmd += ' is:open' - if sort_key: - cmd += ' resume_sortkey:%016x' % sort_key + 'branch': branch, 'limit': limit, 'age': age}) + if status: + cmd += ' status:%s' % status if grab_comments: cmd += ' --comments' return cmd @@ -123,37 +119,47 @@ class Gerrit(Rcs): return False def _poll_reviews(self, project_organization, module, branch, - start_id=0, last_id=0, is_open=False, - grab_comments=False): - sort_key = start_id - last_id = last_id or 0 + last_retrieval_time, status=None, grab_comments=False): + age = 0 + proceed = True - while True: - cmd = self._get_cmd(project_organization, module, branch, sort_key, - is_open, grab_comments=grab_comments) + # the algorithm retrieves reviews by age; the next page is started + # with the time of the oldest; it is possible that the oldest + # will be included in consequent result (as the age offsets to local + # machine timestamp, but evaluated remotely), so we need to track all + # ids and ignore those we've already seen + processed = set() + + while proceed: + cmd = self._get_cmd(project_organization, module, branch, + age=age, status=status, + grab_comments=grab_comments) LOG.debug('Executing command: %s', cmd) exec_result = self._exec_command(cmd) if not exec_result: break stdin, stdout, stderr = exec_result - proceed = False + proceed = False # assume there are no more reviews available for line in stdout: review = json.loads(line) - if 'sortKey' in review: - sort_key = int(review['sortKey'], 16) - if sort_key <= last_id: + if 'number' in review: # choose reviews not summary + + if review['number'] in processed: + continue # already seen that + + last_updated = int(review['lastUpdated']) + if last_updated < last_retrieval_time: # too old proceed = False break - proceed = True + proceed = True # have at least one review, can dig deeper + age = max(age, int(time.time()) - last_updated) + processed.add(review['number']) review['module'] = module yield review - if not proceed: - break - def get_project_list(self): exec_result = self._exec_command('gerrit ls-projects') if not exec_result: @@ -163,48 +169,16 @@ class Gerrit(Rcs): return result - def log(self, repo, branch, last_id, grab_comments=False): - # poll new reviews from the top down to last_id - LOG.debug('Poll new reviews for module: %s', repo['module']) - for review in self._poll_reviews(repo['organization'], - repo['module'], branch, - last_id=last_id, - grab_comments=grab_comments): + def log(self, repo, branch, last_retrieval_time, status=None, + grab_comments=False): + # poll reviews down from top between last_r_t and current_r_t + LOG.debug('Poll reviews for module: %s', repo['module']) + for review in self._poll_reviews( + repo['organization'], repo['module'], branch, + last_retrieval_time, status=status, + grab_comments=grab_comments): yield review - # poll open reviews from last_id down to bottom - LOG.debug('Poll open reviews for module: %s', repo['module']) - start_id = None - if last_id: - start_id = last_id + 1 # include the last review into query - for review in self._poll_reviews(repo['organization'], - repo['module'], branch, - start_id=start_id, is_open=True, - grab_comments=grab_comments): - yield review - - def get_last_id(self, repo, branch): - LOG.debug('Get last id for module: %s', repo['module']) - - cmd = self._get_cmd(repo['organization'], repo['module'], - branch, limit=1) - LOG.debug('Executing command: %s', cmd) - exec_result = self._exec_command(cmd) - if not exec_result: - return None - stdin, stdout, stderr = exec_result - - last_id = None - for line in stdout: - review = json.loads(line) - if 'sortKey' in review: - last_id = int(review['sortKey'], 16) - break - - LOG.debug('Module %(module)s last id is %(id)s', - {'module': repo['module'], 'id': last_id}) - return last_id - def close(self): self.client.close() diff --git a/stackalytics/tests/unit/test_rcs.py b/stackalytics/tests/unit/test_rcs.py new file mode 100644 index 000000000..ff7279a23 --- /dev/null +++ b/stackalytics/tests/unit/test_rcs.py @@ -0,0 +1,127 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import mock +import testtools + +from stackalytics.processor import rcs + +REVIEW_ONE = json.dumps( + {"project": "openstack/nova", "branch": "master", "topic": "bug/1494374", + "id": "Id741dfc769c02a5544691a7db49a7dbff6b11376", "number": "229382", + "subject": "method is nearly 400 LOC and should be broken up", + "createdOn": 1443613948, "lastUpdated": 1444222222, + "sortKey": "0038481b00038006", "open": True, "status": "NEW"}) +REVIEW_END_LINE = json.dumps( + {"type": "stats", "rowCount": 2, "runTimeMilliseconds": 13}) + + +class TestRcs(testtools.TestCase): + + @mock.patch('paramiko.SSHClient') + def test_setup(self, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_connect = mock.Mock() + mock_client.connect = mock_connect + + gerrit = rcs.Gerrit('gerrit://review.openstack.org') + setup_result = gerrit.setup(username='user', key_filename='key') + + self.assertEqual(True, setup_result) + mock_connect.assert_called_once_with( + 'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key', + username='user') + + @mock.patch('paramiko.SSHClient') + def test_setup_error(self, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_connect = mock.Mock() + mock_client.connect = mock_connect + mock_connect.side_effect = Exception + + gerrit = rcs.Gerrit('gerrit://review.openstack.org') + setup_result = gerrit.setup(username='user', key_filename='key') + + self.assertEqual(False, setup_result) + mock_connect.assert_called_once_with( + 'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key', + username='user') + + @mock.patch('paramiko.SSHClient') + @mock.patch('time.time') + def test_log(self, mock_time, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_exec = mock.Mock() + mock_client.exec_command = mock_exec + mock_exec.side_effect = [ + ('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary + ('', [REVIEW_END_LINE], ''), # only summary = no more reviews + ] + + gerrit = rcs.Gerrit('uri') + + repo = dict(organization='openstack', module='nova') + branch = 'master' + last_retrieval_time = 1444000000 + mock_time.return_value = 1444333333 + records = list(gerrit.log(repo, branch, last_retrieval_time)) + + self.assertEqual(1, len(records)) + self.assertEqual('229382', records[0]['number']) + + mock_client.exec_command.assert_has_calls([ + mock.call('gerrit query --all-approvals --patch-sets ' + '--format JSON project:\'openstack/nova\' branch:master ' + 'limit:100 age:0s'), + mock.call('gerrit query --all-approvals --patch-sets ' + '--format JSON project:\'openstack/nova\' branch:master ' + 'limit:100 age:111111s'), + ]) + + @mock.patch('paramiko.SSHClient') + def test_log_old_reviews(self, mock_client_cons): + mock_client = mock.Mock() + mock_client_cons.return_value = mock_client + + mock_exec = mock.Mock() + mock_client.exec_command = mock_exec + mock_exec.side_effect = [ + ('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary + ('', [REVIEW_END_LINE], ''), # only summary = no more reviews + ] + + gerrit = rcs.Gerrit('uri') + + repo = dict(organization='openstack', module='nova') + branch = 'master' + last_retrieval_time = 1445000000 + records = list(gerrit.log(repo, branch, last_retrieval_time, + status='merged', grab_comments=True)) + + self.assertEqual(0, len(records)) + + mock_client.exec_command.assert_has_calls([ + mock.call('gerrit query --all-approvals --patch-sets ' + '--format JSON project:\'openstack/nova\' branch:master ' + 'limit:100 age:0s status:merged --comments'), + ])